agenda: Remove call to deprecated agenda-except-port-map procedure.
[8sync.git] / 8sync / agenda.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright (C) 2015, 2016 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync agenda)
20   #:use-module (srfi srfi-1)
21   #:use-module (srfi srfi-9)
22   #:use-module (srfi srfi-9 gnu)
23   #:use-module (ice-9 q)
24   #:use-module (ice-9 match)
25   #:use-module (ice-9 receive)
26   #:use-module (ice-9 suspendable-ports)
27   #:export (<agenda>
28             make-agenda agenda?
29             agenda-queue agenda-prompt-tag
30             agenda-read-port-map agenda-write-port-map
31             agenda-schedule
32             
33             make-async-prompt-tag
34
35             list->q make-q*
36
37             <time-segment>
38             make-time-segment time-segment?
39             time-segment-time time-segment-queue
40
41             time< time= time<= time-delta+
42             time-minus time-plus
43
44             <time-delta>
45             make-time-delta tdelta time-delta?
46             time-delta-sec time-delta-usec
47
48             <schedule>
49             make-schedule schedule?
50             schedule-add! schedule-empty?
51             schedule-segments
52             schedule-soonest-time
53
54             schedule-segments-split schedule-extract-until!
55             add-segments-contents-to-queue!
56
57             <run-request>
58             make-run-request run-request?
59             run-request-proc run-request-when
60
61             run-it wrap wrap-apply run run-at run-delay
62
63             8sync 8sync-delay
64             8sync-run 8sync-run-at 8sync-run-delay
65             8sync-nowait
66             8sleep
67             
68             catch-8sync
69
70             ;; used for introspecting the error, but a method for making
71             ;; is not exposed
72             wrapped-exception?
73             wrapped-exception-key wrapped-exception-args
74             wrapped-exception-stacks
75
76             print-error-and-continue
77
78             stop-on-nothing-to-do
79
80             %current-agenda
81             start-agenda agenda-run-once))
82
83 (install-suspendable-ports!)
84
85 ;; @@: Using immutable agendas here, so wouldn't it make sense to
86 ;;   replace this queue stuff with using pfds based immutable queues?
87
88 \f
89 ;;; Agenda definition
90 ;;; =================
91
92 ;;; The agenda consists of:
93 ;;;  - a queue of immediate items to handle
94 ;;;  - sheduled future events to be added to a future queue
95 ;;;  - a tag by which running processes can escape for some asynchronous
96 ;;;    operation (from which they can be returned later)
97 ;;;  - a mapping of ports to various handler procedures
98 ;;;
99 ;;; The goal, eventually, is for this all to be immutable and functional.
100 ;;; However, we aren't there yet.  Some tricky things:
101 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
102 ;;;  - Need to use immutable queues (ijp's pfds library?)
103 ;;;  - Modeling reading from ports as something repeatable,
104 ;;;    and with reasonable separation from functional components?
105
106 (define-immutable-record-type <agenda>
107   (make-agenda-intern queue prompt-tag
108                       read-port-map write-port-map
109                       schedule time catch-handler pre-unwind-handler)
110   agenda?
111   (queue agenda-queue)
112   (prompt-tag agenda-prompt-tag)
113   (read-port-map agenda-read-port-map)
114   (write-port-map agenda-write-port-map)
115   (schedule agenda-schedule)
116   (time agenda-time)
117   (catch-handler agenda-catch-handler)
118   (pre-unwind-handler agenda-pre-unwind-handler))
119
120 (define (make-async-prompt-tag)
121   "Make an async prompt tag for an agenda.
122
123 Generally done automatically for the user through (make-agenda)."
124   (make-prompt-tag "prompt"))
125
126 (define* (make-agenda #:key
127                       (queue (make-q))
128                       (prompt (make-prompt-tag))
129                       (read-port-map (make-hash-table))
130                       (write-port-map (make-hash-table))
131                       (schedule (make-schedule))
132                       (time (gettimeofday))
133                       (catch-handler #f)
134                       (pre-unwind-handler print-error-and-continue))
135   ;; TODO: document arguments
136   "Make a fresh agenda."
137   (make-agenda-intern queue prompt
138                       read-port-map write-port-map
139                       schedule time
140                       catch-handler pre-unwind-handler))
141
142 (define (current-agenda-prompt)
143   "Get the prompt for the current agenda; signal an error if there isn't one."
144   (let ((current-agenda (%current-agenda)))
145     (if (not current-agenda)
146         (throw
147          'no-current-agenda
148          "Can't get current agenda prompt if there's no current agenda!")
149         (agenda-prompt-tag current-agenda))))
150
151 ;; helper for making queues for an agenda
152 (define (list->q lst)
153   "Makes a queue composed of LST items"
154   (let ((q (make-q)))
155     (for-each
156      (lambda (x)
157        (enq! q x))
158      lst)
159     q))
160
161 (define (make-q* . args)
162   "Makes a queue and populates it with this invocation's ARGS"
163   (list->q args))
164
165 \f
166 ;;; Schedule
167 ;;; ========
168
169 ;;; This is where we handle timed events for the future
170
171 ;; This section totally borrows from the ideas in SICP
172 ;; <3 <3 <3
173
174 ;; NOTE: time is a cons of (seconds . microseconds)
175
176 (define-record-type <time-segment>
177   (make-time-segment-intern time queue)
178   time-segment?
179   (time time-segment-time)
180   (queue time-segment-queue))
181
182 ;; @@: This seems to be the same as srfi-18's seconds->time procedure?
183 ;;   Maybe double check and switch to that?  (Thanks amz3!)
184
185 (define (time-from-float-or-fraction time)
186   "Produce a (sec . usec) pair from TIME, a float or fraction"
187   (let* ((mixed-whole (floor time))
188          (mixed-rest (- time mixed-whole))  ; float or fraction component
189          (sec mixed-whole)
190          (usec (floor (* 1000000 mixed-rest))))
191     (cons (inexact->exact sec) (inexact->exact usec))))
192
193 (define (time-segment-right-format time)
194   "Ensure TIME is in the right format.
195
196 The right format means (second . microsecond).
197 If an integer, will convert appropriately."
198   ;; TODO: add floating point / rational number support.
199   (match time
200     ;; time is already a cons of second and microsecnd
201     (((? integer? s) . (? integer? u)) time)
202     ;; time was just an integer (just the second)
203     ((? integer? _) (cons time 0))
204     ((or (? rational? _) (? inexact? _))
205      (time-from-float-or-fraction time))
206     (_ (throw 'invalid-time "Invalid time" time))))
207
208 (define* (make-time-segment time #:optional (queue (make-q)))
209   "Make a time segment of TIME and QUEUE
210
211 No automatic conversion is done, so you might have to
212 run (time-segment-right-format) first."
213   (make-time-segment-intern time queue))
214
215 (define (time< time1 time2)
216   "Check if TIME1 is less than TIME2"
217   (cond ((< (car time1)
218             (car time2))
219          #t)
220         ((and (= (car time1)
221                  (car time2))
222               (< (cdr time1)
223                  (cdr time2)))
224          #t)
225         (else #f)))
226
227 (define (time= time1 time2)
228   "Check whether TIME1 and TIME2 are equivalent"
229   (and (= (car time1) (car time2))
230        (= (cdr time1) (cdr time2))))
231
232 (define (time<= time1 time2)
233   "Check if TIME1 is less than or equal to TIME2"
234   (or (time< time1 time2)
235       (time= time1 time2)))
236
237
238 (define-record-type <time-delta>
239   (make-time-delta-intern sec usec)
240   time-delta?
241   (sec time-delta-sec)
242   (usec time-delta-usec))
243
244 (define* (make-time-delta time)
245   "Make a <time-delta> of SEC seconds and USEC microseconds.
246
247 This is used primarily so the agenda can recognize RUN-REQUEST objects
248 which are meant to delay computation"
249   (match (time-segment-right-format time)
250     ((sec . usec)
251      (make-time-delta-intern sec usec))))
252
253 (define tdelta make-time-delta)
254
255 (define (time-carry-correct time)
256   "Corrects/handles time microsecond carry.
257 Will produce (0 . 0) instead of a negative number, if needed."
258   (cond ((>= (cdr time) 1000000)
259          (cons
260           (+ (car time) 1)
261           (- (cdr time) 1000000)))
262         ((< (cdr time) 0)
263          (if (= (car time) 0)
264              '(0 0)
265              (cons
266               (- (car time) 1)
267               (+ (cdr time) 1000000))))
268         (else time)))
269
270 (define (time-delta+ time time-delta)
271   "Increment a TIME by the value of TIME-DELTA"
272   (time-carry-correct
273    (cons (+ (car time) (time-delta-sec time-delta))
274          (+ (cdr time) (time-delta-usec time-delta)))))
275
276 (define (time-minus time1 time2)
277   "Subtract TIME2 from TIME1"
278   (time-carry-correct
279    (cons (- (car time1) (car time2))
280          (- (cdr time1) (cdr time2)))))
281
282 (define (time-plus time1 time2)
283   "Add TIME1 and TIME2"
284   (time-carry-correct
285    (cons (+ (car time1) (car time2))
286          (+ (cdr time1) (cdr time2)))))
287
288
289 (define-record-type <schedule>
290   (make-schedule-intern segments)
291   schedule?
292   (segments schedule-segments set-schedule-segments!))
293
294 (define* (make-schedule #:optional segments)
295   "Make a schedule, optionally pre-composed of SEGMENTS"
296   (make-schedule-intern (or segments '())))
297
298 (define (schedule-soonest-time schedule)
299   "Return a cons of (sec . usec) for next time segement, or #f if none"
300   (let ((segments (schedule-segments schedule)))
301     (if (eq? segments '())
302         #f
303         (time-segment-time (car segments)))))
304
305 ;; TODO: This code is reasonably easy to read but it
306 ;;   mutates AND is worst case of O(n) in both space and time :(
307 ;;   but at least it'll be reasonably easy to refactor to
308 ;;   a more functional setup?
309 (define (schedule-add! schedule time proc)
310   "Mutate SCHEDULE, adding PROC at an appropriate time segment for TIME"
311   (let ((time (time-segment-right-format time)))
312     (define (new-time-segment)
313       (let ((new-segment
314              (make-time-segment time)))
315         (enq! (time-segment-queue new-segment) proc)
316         new-segment))
317     (define (loop segments)
318       (define (segment-equals-time? segment)
319         (time= time (time-segment-time segment)))
320
321       (define (segment-more-than-time? segment)
322         (time< time (time-segment-time segment)))
323
324       ;; We could switch this out to be more mutate'y
325       ;; and avoid the O(n) of space... is that over-optimizing?
326       (match segments
327         ;; If we're at the end of the list, time to make a new
328         ;; segment...
329         ('() (cons (new-time-segment) '()))
330         ;; If the segment's time is exactly our time, good news
331         ;; everyone!  Let's append our stuff to its queue
332         (((? segment-equals-time? first) rest ...)
333          (enq! (time-segment-queue first) proc)
334          segments)
335         ;; If the first segment is more than our time,
336         ;; ours belongs before this one, so add it and
337         ;; start consing our way back
338         (((? segment-more-than-time? first) rest ...)
339          (cons (new-time-segment) segments))
340         ;; Otherwise, build up recursive result
341         ((first rest ... )
342          (cons first (loop rest)))))
343     (set-schedule-segments!
344      schedule
345      (loop (schedule-segments schedule)))))
346
347 (define (schedule-empty? schedule)
348   "Check if the SCHEDULE is currently empty"
349   (eq? (schedule-segments schedule) '()))
350
351 (define (schedule-segments-split schedule time)
352   "Does a multiple value return of time segments before/at and after TIME"
353   (let ((time (time-segment-right-format time)))
354     (define (segment-is-now? segment)
355       (time= (time-segment-time segment) time))
356     (define (segment-is-before-now? segment)
357       (time< (time-segment-time segment) time))
358
359     (let loop ((segments-before '())
360                (segments-left (schedule-segments schedule)))
361       (match segments-left
362         ;; end of the line, return
363         ('()
364          (values (reverse segments-before) '()))
365
366         ;; It's right now, so time to stop, but include this one in before
367         ;; but otherwise return
368         (((? segment-is-now? first) rest ...)
369          (values (reverse (cons first segments-before)) rest))
370
371         ;; This is prior or at now, so add it and keep going
372         (((? segment-is-before-now? first) rest ...)
373          (loop (cons first segments-before) rest))
374
375         ;; Otherwise it's past now, just return what we have
376         (segments-after
377          (values segments-before segments-after))))))
378
379 (define (schedule-extract-until! schedule time)
380   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
381   (receive (segments-before segments-after)
382       (schedule-segments-split schedule time)
383     (set-schedule-segments! schedule segments-after)
384     segments-before))
385
386 (define (add-segments-contents-to-queue! segments queue)
387   (for-each
388    (lambda (segment)
389      (let ((seg-queue (time-segment-queue segment)))
390        (while (not (q-empty? seg-queue))
391          (enq! queue (deq! seg-queue)))))
392    segments))
393
394
395 \f
396 ;;; Request to run stuff
397 ;;; ====================
398
399 (define-record-type <run-request>
400   (make-run-request proc when)
401   run-request?
402   (proc run-request-proc)
403   (when run-request-when))
404
405 (define* (run-it proc #:optional when)
406   "Make a request to run PROC (possibly at WHEN)"
407   (make-run-request proc when))
408
409 (define-syntax-rule (wrap body ...)
410   "Wrap contents in a procedure"
411   (lambda ()
412     body ...))
413
414 (define-syntax-rule (wrap-apply body)
415   "Wrap possibly multi-value function in a procedure, applies all arguments"
416   (lambda args
417     (apply body args)))
418
419
420 ;; @@: Do we really want `body ...' here?
421 ;;   what about just `body'?
422 (define-syntax-rule (run body ...)
423   "Run everything in BODY but wrap in a convenient procedure"
424   (make-run-request (wrap body ...) #f))
425
426 (define-syntax-rule (run-at body ... when)
427   "Run BODY at WHEN"
428   (make-run-request (wrap body ...) when))
429
430 ;; @@: Is it okay to overload the term "delay" like this?
431 ;;   Would `run-in' be better?
432 (define-syntax-rule (run-delay body ... delay-time)
433   "Run BODY at DELAY-TIME time from now"
434   (make-run-request (wrap body ...) (tdelta delay-time)))
435
436
437 \f
438 ;;; Asynchronous escape to run things
439 ;;; =================================
440
441 (define-syntax-rule (8sync-abort-to-prompt async-request)
442   (abort-to-prompt (current-agenda-prompt)
443                    async-request))
444
445 ;; Async port request and run-request meta-requests
446 (define (make-async-request proc)
447   "Wrap PROC in an async-request
448
449 The purpose of this is to make sure that users don't accidentally
450 return the wrong thing via (8sync) and trip themselves up."
451   (cons '*async-request* proc))
452
453 (define (setup-async-request resume-kont async-request)
454   "Complete an async request for agenda-run-once's continuation handling"
455   (match async-request
456     (('*async-request* . async-setup-proc)
457      (async-setup-proc resume-kont))
458     ;; TODO: deliver more helpful errors depending on what the user
459     ;;   returned
460     (_ (throw 'invalid-async-request
461               "Invalid request passed back via an (8sync) procedure."
462               async-request))))
463
464 (define-record-type <wrapped-exception>
465   (make-wrapped-exception key args stacks)
466   wrapped-exception?
467   (key wrapped-exception-key)
468   (args wrapped-exception-args)
469   (stacks wrapped-exception-stacks))
470
471 (define-syntax-rule (propagate-%async-exceptions body)
472   (let ((body-result body))
473     (if (wrapped-exception? body-result)
474         (throw '8sync-caught-error
475                (wrapped-exception-key body-result)
476                (wrapped-exception-args body-result)
477                (wrapped-exception-stacks body-result))
478         body-result)))
479
480 (define-syntax 8sync
481   (syntax-rules ()
482     "Run BODY asynchronously (8synchronously?) at a prompt, then return.
483
484 Possibly specify WHEN as the second argument."
485     ((8sync body)
486      (8sync-run body))
487     ((8sync body when)
488      (8sync-run-at body when))))
489
490 (define-syntax-rule (8sync-run body ...)
491   (8sync-run-at body ... #f))
492
493 (define-syntax-rule (8sync-run-at body ... when)
494   (propagate-%async-exceptions
495    (8sync-abort-to-prompt
496     ;; Send an asynchronous request to apply a continuation to the
497     ;; following function, then handle that as a request to the agenda
498     (make-async-request
499      (lambda (kont)
500        ;; We're making a run request
501        (make-run-request
502         ;; Wrapping the following execution to run...
503         (wrap
504          ;; Once we get the result from the inner part, we'll resume
505          ;; this continuation, but first
506          ;; @@: Is this running immediately, or queueing the result
507          ;;   after evaluation for the next agenda tick?  It looks
508          ;;   like evaluating immediately.  Is that what we want?
509          (kont
510           ;; Any unhandled errors are caught
511           (let ((exception-stack #f))
512             (catch #t
513               ;; Run the actual code the user requested
514               (lambda ()
515                 body ...)
516               ;; If something bad happened and we didn't catch it,
517               ;; we'll wrap it up in such a way that the continuation
518               ;; can address it
519               (lambda (key . args)
520                 (cond
521                  ((eq? key '8sync-caught-error)
522                   (match args
523                     ((orig-key orig-args orig-stacks)
524                      (make-wrapped-exception
525                       orig-key orig-args
526                       (cons exception-stack orig-stacks)))))
527                  (else
528                   (make-wrapped-exception key args
529                                           (list exception-stack)))))
530               (lambda _
531                 (set! exception-stack (make-stack #t 1 0)))))))
532         when))))))
533
534 (define-syntax-rule (8sync-run-delay body ... delay-time)
535   (8sync-run-at body ... (tdelta delay-time)))
536
537 (define-syntax-rule (8sync-delay args ...)
538   (8sync-run-delay args ...))
539
540 ;; TODO: Write (%run-immediately)
541
542 (define-syntax-rule (8sync-nowait body)
543   "Run body asynchronously but ignore its result...
544 forge ahead in our current function!"
545   (8sync-abort-to-prompt
546    (make-async-request
547     (lambda (kont)
548       (list (make-run-request
549              ;; What's with returning #f to kont?
550              ;; Otherwise we sometimes get errors like
551              ;; "Zero values returned to single-valued continuation""
552              (wrap (kont #f)) #f)
553             (make-run-request (lambda () body) #f))))))
554
555 (define-syntax-rule (catch-8sync exp (handler-key handler) ...)
556   (catch '8sync-caught-error
557     (lambda ()
558       exp)
559     (lambda (_ orig-key orig-args orig-stacks)
560       (cond
561        ((or (eq? handler-key #t)
562             (eq? orig-key handler-key))
563         (apply handler orig-stacks orig-args)) ...
564        (else (raise '8sync-caught-error
565                     orig-key orig-args orig-stacks))))))
566
567 ;; This is sugar... and could probably be considerably
568 ;; simplified and optimized.  But whatever.
569 (define-syntax-rule (8sleep time)
570   (8sync-delay 'no-op time))
571
572
573 \f
574 ;;; Execution of agenda, and current agenda
575 ;;; =======================================
576
577 (define %current-agenda (make-parameter #f))
578
579 (define (update-agenda-from-select! agenda)
580   "Potentially (select) on ports specified in agenda, adding items to queue.
581
582 Also handles sleeping when all we have to do is wait on the schedule."
583   (define (hash-keys hash)
584     (hash-map->list (lambda (k v) k) hash))
585   (define (get-wait-time)
586     ;; TODO: we need to figure this out based on whether there's anything
587     ;;   in the queue, and if not, how long till the next scheduled item
588     (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
589       (cond 
590        ((not (q-empty? (agenda-queue agenda)))
591         (cons 0 0))
592        (soonest-time    ; ie, the agenda is non-empty
593         (let* ((current-time (agenda-time agenda)))
594           (if (time<= soonest-time current-time)
595               ;; Well there's something due so let's select
596               ;; (this avoids a (possible?) race condition chance)
597               (cons 0 0)
598               (time-minus soonest-time current-time))))
599        (else
600         (cons #f #f)))))
601   (define (do-select)
602     ;; TODO: support usecond wait time too
603     (match (get-wait-time)
604       ((sec . usec)
605        (catch 'system-error
606          (lambda ()
607            (select (hash-keys (agenda-read-port-map agenda))
608                    (hash-keys (agenda-write-port-map agenda))
609                    '()
610                    sec usec))
611          (lambda (key . rest-args)
612            (match rest-args
613              ((_ _ _ (EINTR))
614               '(() () ()))
615              (_ (error "Unhandled error in select!" key rest-args))))))))
616   (define (get-procs-to-run)
617     (define (ports->procs ports port-map)
618       (lambda (initial-procs)
619         (fold
620          (lambda (port prev)
621            (define proc (hashq-ref port-map port))
622            ;; Now that we've selected on this port, it can be removed
623            (hashq-remove! port-map port)
624            (cons proc prev))
625          initial-procs
626          ports)))
627     (match (do-select)
628       ((read-ports write-ports except-ports) ; except-ports not used
629        ((compose (ports->procs
630                   read-ports
631                   (agenda-read-port-map agenda))
632                  (ports->procs
633                   write-ports
634                   (agenda-write-port-map agenda)))
635         '()))))
636   (define (update-agenda)
637     (let ((procs-to-run (get-procs-to-run))
638           (q (agenda-queue agenda)))
639       (for-each
640        (lambda (proc)
641          (enq! q proc))
642        procs-to-run))
643     agenda)
644   (define (ports-to-select?)
645     (define (has-items? selector)
646       ;; @@: O(n)
647       ;;    ... we could use hash-for-each and a continuation to jump
648       ;;    out with a #t at first indication of an item
649       (not (= (hash-count (const #t)
650                           (selector agenda))
651               0)))
652     (or (has-items? agenda-read-port-map)
653         (has-items? agenda-write-port-map)))
654
655   (if (or (ports-to-select?)
656           ;; select doubles as sleep...
657           (not (schedule-empty? (agenda-schedule agenda)))) 
658       (update-agenda)
659       agenda))
660
661 (define-record-type <read-request>
662   (make-read-request port proc)
663   read-request?
664   (port read-request-port)
665   (proc read-request-proc))
666
667 (define-record-type <write-request>
668   (make-write-request port proc)
669   write-request?
670   (port write-request-port)
671   (proc write-request-proc))
672
673 (define (agenda-handle-read-request! agenda read-request)
674   "Handle <read-request>, which is a request to add this port to the poll/select
675 on suspendable ports."
676   (hashq-set! (agenda-read-port-map agenda)
677               (read-request-port read-request)
678               (read-request-proc read-request)))
679
680 (define (agenda-handle-write-request! agenda write-request)
681   (hashq-set! (agenda-write-port-map agenda)
682               (write-request-port write-request)
683               (write-request-proc write-request)))
684
685 (define (stop-on-nothing-to-do agenda)
686   (and (q-empty? (agenda-queue agenda))
687        (schedule-empty? (agenda-schedule agenda))
688        (= 0 (hash-count (const #t) (agenda-read-port-map agenda)))
689        (= 0 (hash-count (const #t) (agenda-write-port-map agenda)))))
690
691
692 (define* (start-agenda agenda
693                        #:key
694                        ;; @@: Should we make stop-on-nothing-to-do
695                        ;;   the default stop-condition?
696                        (stop-condition stop-on-nothing-to-do)
697                        (get-time gettimeofday)
698                        (handle-ports update-agenda-from-select!)
699                        ;; For live hacking madness, etc
700                        (post-run-hook #f))
701   ;; TODO: Document fields
702   "Start up the AGENDA"
703   (let loop ((agenda agenda))
704     (let ((agenda   
705            ;; @@: Hm, maybe here would be a great place to handle
706            ;;   select'ing on ports.
707            ;;   We could compose over agenda-run-once and agenda-read-ports
708            (agenda-run-once agenda)))
709       ;; @@: This relies on mutation at present on the queue, in the rare
710       ;;   event it's used.  If we ever switch to something more immutable,
711       ;;   it should return a new modified agenda instead.
712       (if post-run-hook
713           (post-run-hook agenda))
714       (if (and stop-condition (stop-condition agenda))
715           'done
716           (let* ((agenda
717                   ;; We have to update the time after ports handled, too
718                   ;; because it may have changed after a select
719                   (set-field
720                    (handle-ports
721                     ;; Adjust the agenda's time just in time
722                     ;; We do this here rather than in agenda-run-once to make
723                     ;; agenda-run-once's behavior fairly predictable
724                     (set-field agenda (agenda-time) (get-time)))
725                    (agenda-time) (get-time))))
726             ;; Update the agenda's current queue based on
727             ;; currently applicable time segments
728             (add-segments-contents-to-queue!
729              (schedule-extract-until! (agenda-schedule agenda) (agenda-time agenda))
730              (agenda-queue agenda))
731             (loop agenda))))))
732
733
734 (define (print-error-and-continue key . args)
735   "Frequently used as pre-unwind-handler for agenda"
736   (cond
737    ((eq? key '8sync-caught-error)
738     (match args
739       ((orig-key orig-args stacks)
740        (display "\n*** Caught async exception. ***\n")
741        (format (current-error-port)
742                "* Original key '~s and arguments: ~s *\n"
743                orig-key orig-args)
744        (display "* Caught stacks below (ending with original) *\n\n")
745        (for-each
746         (lambda (s)
747           (display-backtrace s (current-error-port))
748           (newline (current-error-port)))
749         stacks))))
750    (else
751     (format (current-error-port)
752             "\n*** Caught exception with key '~s and arguments: ~s ***\n"
753             key args)
754     (display-backtrace (make-stack #t 1 0)
755                        (current-error-port))
756     (newline (current-error-port)))))
757
758 (define-syntax-rule (maybe-catch-all (catch-handler pre-unwind-handler)
759                                      body ...)
760   (if (or catch-handler pre-unwind-handler)
761       (catch
762         #t
763         (lambda ()
764           body ...)
765         (or catch-handler (lambda _ #f))
766         (or pre-unwind-handler (lambda _ #f)))
767       (begin body ...)))
768
769 (define (wait-for-readable port)
770   (8sync-abort-to-prompt
771    (make-async-request
772     (lambda (kont)
773       (make-read-request port (wrap (kont #f)))))))
774
775 (define (wait-for-writable port)
776   (8sync-abort-to-prompt
777    (make-async-request
778     (lambda (kont)
779       (make-write-request port (wrap (kont #f)))))))
780
781 (define (agenda-run-once agenda)
782   "Run once through the agenda, and produce a new agenda
783 based on the results"
784   (define (call-proc proc)
785     (call-with-prompt
786      (agenda-prompt-tag agenda)
787      (lambda ()
788        (parameterize ((%current-agenda agenda)
789                       ;; @@: Couldn't we just parameterize this at the start of
790                       ;;   the agenda...?
791                       (current-read-waiter wait-for-readable)
792                       (current-write-waiter wait-for-writable))
793          (maybe-catch-all
794           ((agenda-catch-handler agenda)
795            (agenda-pre-unwind-handler agenda))
796           (proc))))
797      (lambda (kont async-request)
798        (setup-async-request kont async-request))))
799
800   (let ((queue (agenda-queue agenda))
801         (next-queue (make-q)))
802     (while (not (q-empty? queue))
803       (let* ((proc (q-pop! queue))
804              (proc-result (call-proc proc))
805              (enqueue
806               (lambda (run-request)
807                 (define (schedule-at! time proc)
808                   (schedule-add! (agenda-schedule agenda) time proc))
809                 (let ((request-time (run-request-when run-request)))
810                   (match request-time
811                     ((? time-delta? time-delta)
812                      (let ((time (time-delta+ (agenda-time agenda)
813                                               time-delta)))
814                        (schedule-at! time (run-request-proc run-request))))
815                     ((? integer? sec)
816                      (let ((time (cons sec 0)))
817                        (schedule-at! time (run-request-proc run-request))))
818                     (((? integer? sec) . (? integer? usec))
819                      (schedule-at! request-time (run-request-proc run-request)))
820                     (#f
821                      (enq! next-queue (run-request-proc run-request))))))))
822         (define (handle-individual result)
823           ;; @@: Could maybe optimize by switching to an explicit cond...
824           (match result
825             ((? run-request? new-proc)
826              (enqueue new-proc))
827             ((? read-request? read-request)
828              (agenda-handle-read-request! agenda read-request))
829             ((? write-request? write-request)
830              (agenda-handle-write-request! agenda write-request))
831             ;; do nothing
832             ;; @@: Why not throw an error?
833             (_ #f)))
834         ;; @@: We might support delay-wrapped procedures here
835         (match proc-result
836           ((results ...)
837            (for-each handle-individual results))
838           (one-result (handle-individual one-result)))))
839     ;; TODO: Alternately, we could return the next-queue
840     ;;   along with changes to be added to the schedule here?
841     ;; Return new agenda, with next queue set
842     (set-field agenda (agenda-queue) next-queue)))