agenda: Add 8usleep.
[8sync.git] / 8sync / agenda.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright (C) 2015, 2016 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync agenda)
20   #:use-module (srfi srfi-1)
21   #:use-module (srfi srfi-9)
22   #:use-module (srfi srfi-9 gnu)
23   #:use-module (ice-9 q)
24   #:use-module (ice-9 match)
25   #:use-module (ice-9 receive)
26   #:use-module (ice-9 suspendable-ports)
27   #:export (<agenda>
28             make-agenda agenda?
29             agenda-queue agenda-prompt-tag
30             agenda-read-port-map agenda-write-port-map
31             agenda-schedule
32             
33             make-async-prompt-tag
34
35             list->q make-q*
36
37             <time-segment>
38             make-time-segment time-segment?
39             time-segment-time time-segment-queue
40
41             time< time= time<= time-delta+
42             time-minus time-plus
43
44             <time-delta>
45             make-time-delta tdelta time-delta?
46             time-delta-sec time-delta-usec
47
48             <schedule>
49             make-schedule schedule?
50             schedule-add! schedule-empty?
51             schedule-segments
52             schedule-soonest-time
53
54             schedule-segments-split schedule-extract-until!
55             add-segments-contents-to-queue!
56
57             <run-request>
58             make-run-request run-request?
59             run-request-proc run-request-when
60
61             run-it wrap wrap-apply run run-at run-delay
62
63             8sync-delay
64             8sync-run 8sync-run-at 8sync-run-delay
65             8sync
66             8sleep
67             
68             ;; used for introspecting the error, but a method for making
69             ;; is not exposed
70             wrapped-exception?
71             wrapped-exception-key wrapped-exception-args
72             wrapped-exception-stacks
73
74             print-error-and-continue
75
76             stop-on-nothing-to-do
77
78             %current-agenda
79             start-agenda agenda-run-once))
80
81 ;; @@: Using immutable agendas here, so wouldn't it make sense to
82 ;;   replace this queue stuff with using pfds based immutable queues?
83
84 \f
85 ;;; Agenda definition
86 ;;; =================
87
88 ;;; The agenda consists of:
89 ;;;  - a queue of immediate items to handle
90 ;;;  - sheduled future events to be added to a future queue
91 ;;;  - a tag by which running processes can escape for some asynchronous
92 ;;;    operation (from which they can be returned later)
93 ;;;  - a mapping of ports to various handler procedures
94 ;;;
95 ;;; The goal, eventually, is for this all to be immutable and functional.
96 ;;; However, we aren't there yet.  Some tricky things:
97 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
98 ;;;  - Need to use immutable queues (ijp's pfds library?)
99 ;;;  - Modeling reading from ports as something repeatable,
100 ;;;    and with reasonable separation from functional components?
101
102 (define-immutable-record-type <agenda>
103   (make-agenda-intern queue prompt-tag
104                       read-port-map write-port-map
105                       schedule time catch-handler pre-unwind-handler)
106   agenda?
107   (queue agenda-queue)
108   (prompt-tag agenda-prompt-tag)
109   (read-port-map agenda-read-port-map)
110   (write-port-map agenda-write-port-map)
111   (schedule agenda-schedule)
112   (time agenda-time)
113   (catch-handler agenda-catch-handler)
114   (pre-unwind-handler agenda-pre-unwind-handler))
115
116 (define (make-async-prompt-tag)
117   "Make an async prompt tag for an agenda.
118
119 Generally done automatically for the user through (make-agenda)."
120   (make-prompt-tag "prompt"))
121
122 (define* (make-agenda #:key
123                       (queue (make-q))
124                       (prompt (make-prompt-tag))
125                       (read-port-map (make-hash-table))
126                       (write-port-map (make-hash-table))
127                       (schedule (make-schedule))
128                       (time (gettimeofday))
129                       (catch-handler #f)
130                       (pre-unwind-handler print-error-and-continue))
131   ;; TODO: document arguments
132   "Make a fresh agenda."
133   (make-agenda-intern queue prompt
134                       read-port-map write-port-map
135                       schedule time
136                       catch-handler pre-unwind-handler))
137
138 (define (current-agenda-prompt)
139   "Get the prompt for the current agenda; signal an error if there isn't one."
140   (let ((current-agenda (%current-agenda)))
141     (if (not current-agenda)
142         (throw
143          'no-current-agenda
144          "Can't get current agenda prompt if there's no current agenda!")
145         (agenda-prompt-tag current-agenda))))
146
147 ;; helper for making queues for an agenda
148 (define (list->q lst)
149   "Makes a queue composed of LST items"
150   (let ((q (make-q)))
151     (for-each
152      (lambda (x)
153        (enq! q x))
154      lst)
155     q))
156
157 (define (make-q* . args)
158   "Makes a queue and populates it with this invocation's ARGS"
159   (list->q args))
160
161 \f
162 ;;; Schedule
163 ;;; ========
164
165 ;;; This is where we handle timed events for the future
166
167 ;; This section totally borrows from the ideas in SICP
168 ;; <3 <3 <3
169
170 ;; NOTE: time is a cons of (seconds . microseconds)
171
172 (define-record-type <time-segment>
173   (make-time-segment-intern time queue)
174   time-segment?
175   (time time-segment-time)
176   (queue time-segment-queue))
177
178 ;; @@: This seems to be the same as srfi-18's seconds->time procedure?
179 ;;   Maybe double check and switch to that?  (Thanks amz3!)
180
181 (define (time-from-float-or-fraction time)
182   "Produce a (sec . usec) pair from TIME, a float or fraction"
183   (let* ((mixed-whole (floor time))
184          (mixed-rest (- time mixed-whole))  ; float or fraction component
185          (sec mixed-whole)
186          (usec (floor (* 1000000 mixed-rest))))
187     (cons (inexact->exact sec) (inexact->exact usec))))
188
189 (define (time-segment-right-format time)
190   "Ensure TIME is in the right format.
191
192 The right format means (second . microsecond).
193 If an integer, will convert appropriately."
194   ;; TODO: add floating point / rational number support.
195   (match time
196     ;; time is already a cons of second and microsecnd
197     (((? integer? s) . (? integer? u)) time)
198     ;; time was just an integer (just the second)
199     ((? integer? _) (cons time 0))
200     ((or (? rational? _) (? inexact? _))
201      (time-from-float-or-fraction time))
202     (_ (throw 'invalid-time "Invalid time" time))))
203
204 (define* (make-time-segment time #:optional (queue (make-q)))
205   "Make a time segment of TIME and QUEUE
206
207 No automatic conversion is done, so you might have to
208 run (time-segment-right-format) first."
209   (make-time-segment-intern time queue))
210
211 (define (time< time1 time2)
212   "Check if TIME1 is less than TIME2"
213   (cond ((< (car time1)
214             (car time2))
215          #t)
216         ((and (= (car time1)
217                  (car time2))
218               (< (cdr time1)
219                  (cdr time2)))
220          #t)
221         (else #f)))
222
223 (define (time= time1 time2)
224   "Check whether TIME1 and TIME2 are equivalent"
225   (and (= (car time1) (car time2))
226        (= (cdr time1) (cdr time2))))
227
228 (define (time<= time1 time2)
229   "Check if TIME1 is less than or equal to TIME2"
230   (or (time< time1 time2)
231       (time= time1 time2)))
232
233
234 (define-record-type <time-delta>
235   (make-time-delta-intern sec usec)
236   time-delta?
237   (sec time-delta-sec)
238   (usec time-delta-usec))
239
240 (define* (make-time-delta time)
241   "Make a <time-delta> of SEC seconds and USEC microseconds.
242
243 This is used primarily so the agenda can recognize RUN-REQUEST objects
244 which are meant to delay computation"
245   (match (time-segment-right-format time)
246     ((sec . usec)
247      (make-time-delta-intern sec usec))))
248
249 (define tdelta make-time-delta)
250
251 (define (time-carry-correct time)
252   "Corrects/handles time microsecond carry.
253 Will produce (0 . 0) instead of a negative number, if needed."
254   (cond ((>= (cdr time) 1000000)
255          (cons
256           (+ (car time) 1)
257           (- (cdr time) 1000000)))
258         ((< (cdr time) 0)
259          (if (= (car time) 0)
260              '(0 0)
261              (cons
262               (- (car time) 1)
263               (+ (cdr time) 1000000))))
264         (else time)))
265
266 (define (time-delta+ time time-delta)
267   "Increment a TIME by the value of TIME-DELTA"
268   (time-carry-correct
269    (cons (+ (car time) (time-delta-sec time-delta))
270          (+ (cdr time) (time-delta-usec time-delta)))))
271
272 (define (time-minus time1 time2)
273   "Subtract TIME2 from TIME1"
274   (time-carry-correct
275    (cons (- (car time1) (car time2))
276          (- (cdr time1) (cdr time2)))))
277
278 (define (time-plus time1 time2)
279   "Add TIME1 and TIME2"
280   (time-carry-correct
281    (cons (+ (car time1) (car time2))
282          (+ (cdr time1) (cdr time2)))))
283
284
285 (define-record-type <schedule>
286   (make-schedule-intern segments)
287   schedule?
288   (segments schedule-segments set-schedule-segments!))
289
290 (define* (make-schedule #:optional segments)
291   "Make a schedule, optionally pre-composed of SEGMENTS"
292   (make-schedule-intern (or segments '())))
293
294 (define (schedule-soonest-time schedule)
295   "Return a cons of (sec . usec) for next time segement, or #f if none"
296   (let ((segments (schedule-segments schedule)))
297     (if (eq? segments '())
298         #f
299         (time-segment-time (car segments)))))
300
301 ;; TODO: This code is reasonably easy to read but it
302 ;;   mutates AND is worst case of O(n) in both space and time :(
303 ;;   but at least it'll be reasonably easy to refactor to
304 ;;   a more functional setup?
305 (define (schedule-add! schedule time proc)
306   "Mutate SCHEDULE, adding PROC at an appropriate time segment for TIME"
307   (let ((time (time-segment-right-format time)))
308     (define (new-time-segment)
309       (let ((new-segment
310              (make-time-segment time)))
311         (enq! (time-segment-queue new-segment) proc)
312         new-segment))
313     (define (loop segments)
314       (define (segment-equals-time? segment)
315         (time= time (time-segment-time segment)))
316
317       (define (segment-more-than-time? segment)
318         (time< time (time-segment-time segment)))
319
320       ;; We could switch this out to be more mutate'y
321       ;; and avoid the O(n) of space... is that over-optimizing?
322       (match segments
323         ;; If we're at the end of the list, time to make a new
324         ;; segment...
325         ('() (cons (new-time-segment) '()))
326         ;; If the segment's time is exactly our time, good news
327         ;; everyone!  Let's append our stuff to its queue
328         (((? segment-equals-time? first) rest ...)
329          (enq! (time-segment-queue first) proc)
330          segments)
331         ;; If the first segment is more than our time,
332         ;; ours belongs before this one, so add it and
333         ;; start consing our way back
334         (((? segment-more-than-time? first) rest ...)
335          (cons (new-time-segment) segments))
336         ;; Otherwise, build up recursive result
337         ((first rest ... )
338          (cons first (loop rest)))))
339     (set-schedule-segments!
340      schedule
341      (loop (schedule-segments schedule)))))
342
343 (define (schedule-empty? schedule)
344   "Check if the SCHEDULE is currently empty"
345   (eq? (schedule-segments schedule) '()))
346
347 (define (schedule-segments-split schedule time)
348   "Does a multiple value return of time segments before/at and after TIME"
349   (let ((time (time-segment-right-format time)))
350     (define (segment-is-now? segment)
351       (time= (time-segment-time segment) time))
352     (define (segment-is-before-now? segment)
353       (time< (time-segment-time segment) time))
354
355     (let loop ((segments-before '())
356                (segments-left (schedule-segments schedule)))
357       (match segments-left
358         ;; end of the line, return
359         ('()
360          (values (reverse segments-before) '()))
361
362         ;; It's right now, so time to stop, but include this one in before
363         ;; but otherwise return
364         (((? segment-is-now? first) rest ...)
365          (values (reverse (cons first segments-before)) rest))
366
367         ;; This is prior or at now, so add it and keep going
368         (((? segment-is-before-now? first) rest ...)
369          (loop (cons first segments-before) rest))
370
371         ;; Otherwise it's past now, just return what we have
372         (segments-after
373          (values segments-before segments-after))))))
374
375 (define (schedule-extract-until! schedule time)
376   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
377   (receive (segments-before segments-after)
378       (schedule-segments-split schedule time)
379     (set-schedule-segments! schedule segments-after)
380     segments-before))
381
382 (define (add-segments-contents-to-queue! segments queue)
383   (for-each
384    (lambda (segment)
385      (let ((seg-queue (time-segment-queue segment)))
386        (while (not (q-empty? seg-queue))
387          (enq! queue (deq! seg-queue)))))
388    segments))
389
390
391 \f
392 ;;; Request to run stuff
393 ;;; ====================
394
395 (define-record-type <run-request>
396   (make-run-request proc when)
397   run-request?
398   (proc run-request-proc)
399   (when run-request-when))
400
401 (define* (run-it proc #:optional when)
402   "Make a request to run PROC (possibly at WHEN)"
403   (make-run-request proc when))
404
405 (define-syntax-rule (wrap body ...)
406   "Wrap contents in a procedure"
407   (lambda ()
408     body ...))
409
410 (define-syntax-rule (wrap-apply body)
411   "Wrap possibly multi-value function in a procedure, applies all arguments"
412   (lambda args
413     (apply body args)))
414
415
416 ;; @@: Do we really want `body ...' here?
417 ;;   what about just `body'?
418 (define-syntax-rule (run body ...)
419   "Run everything in BODY but wrap in a convenient procedure"
420   (make-run-request (wrap body ...) #f))
421
422 (define-syntax-rule (run-at body ... when)
423   "Run BODY at WHEN"
424   (make-run-request (wrap body ...) when))
425
426 ;; @@: Is it okay to overload the term "delay" like this?
427 ;;   Would `run-in' be better?
428 (define-syntax-rule (run-delay body ... delay-time)
429   "Run BODY at DELAY-TIME time from now"
430   (make-run-request (wrap body ...) (tdelta delay-time)))
431
432
433 \f
434 ;;; Asynchronous escape to run things
435 ;;; =================================
436
437 (define-syntax-rule (8sync-abort-to-prompt async-request)
438   (abort-to-prompt (current-agenda-prompt)
439                    async-request))
440
441 ;; Async port request and run-request meta-requests
442 (define (make-async-request proc)
443   "Wrap PROC in an async-request
444
445 The purpose of this is to make sure that users don't accidentally
446 return the wrong thing via (8sync) and trip themselves up."
447   (cons '*async-request* proc))
448
449 (define (setup-async-request resume-kont async-request)
450   "Complete an async request for agenda-run-once's continuation handling"
451   (match async-request
452     (('*async-request* . async-setup-proc)
453      (async-setup-proc resume-kont))
454     ;; TODO: deliver more helpful errors depending on what the user
455     ;;   returned
456     (_ (throw 'invalid-async-request
457               "Invalid request passed back via an (8sync) procedure."
458               async-request))))
459
460 (define-syntax-rule (8sync body ...)
461   "Run body asynchronously but ignore its result...
462 forge ahead in our current function!"
463   (8sync-abort-to-prompt
464    (make-async-request
465     (lambda (kont)
466       (list (make-run-request
467              ;; What's with returning #f to kont?
468              ;; Otherwise we sometimes get errors like
469              ;; "Zero values returned to single-valued continuation""
470              (wrap (kont #f)) #f)
471             (make-run-request (lambda () body ...) #f))))))
472
473 ;; TODO: Rewrite when we move to this being just `sleep'.
474 (define (8sleep secs)
475   "Like sleep, but asynchronous."
476   (8sync-abort-to-prompt
477    (make-async-request
478     (lambda (kont)
479       (make-run-request (lambda () (kont #f)) (tdelta secs))))))
480
481 (define (8usleep usecs)
482   "Like usleep, but asynchronous."
483   (define (usecs->time-pair)
484     (if (< 1000000)
485         (cons 0 usecs)
486         (let* ((sec (floor (/ usecs 1000000)))
487                (msec (- usecs (* sec 1000000))))
488           (cons sec msec))))
489   (8sync-abort-to-prompt
490    (make-async-request
491     (lambda (kont)
492       (make-run-request (lambda () (kont #f)) (tdelta usecs->time-pair))))))
493
494 ;; Voluntarily yield execution
495 (define (yield)  ; @@: should this be define-inlinable?
496   "Voluntarily yield execution to the scheduler."
497   (8sync-abort-to-prompt
498    (make-async-request
499     (lambda (kont)
500       (make-run-request (lambda () (kont #f)) #f)))))
501
502 \f
503 ;;; Execution of agenda, and current agenda
504 ;;; =======================================
505
506 (define %current-agenda (make-parameter #f))
507
508 (define (update-agenda-from-select! agenda)
509   "Potentially (select) on ports specified in agenda, adding items to queue.
510
511 Also handles sleeping when all we have to do is wait on the schedule."
512   (define (hash-keys hash)
513     (hash-map->list (lambda (k v) k) hash))
514   (define (get-wait-time)
515     ;; TODO: we need to figure this out based on whether there's anything
516     ;;   in the queue, and if not, how long till the next scheduled item
517     (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
518       (cond 
519        ((not (q-empty? (agenda-queue agenda)))
520         (cons 0 0))
521        (soonest-time    ; ie, the agenda is non-empty
522         (let* ((current-time (agenda-time agenda)))
523           (if (time<= soonest-time current-time)
524               ;; Well there's something due so let's select
525               ;; (this avoids a (possible?) race condition chance)
526               (cons 0 0)
527               (time-minus soonest-time current-time))))
528        (else
529         (cons #f #f)))))
530   (define (do-select)
531     ;; TODO: support usecond wait time too
532     (match (get-wait-time)
533       ((sec . usec)
534        (catch 'system-error
535          (lambda ()
536            (select (hash-keys (agenda-read-port-map agenda))
537                    (hash-keys (agenda-write-port-map agenda))
538                    '()
539                    sec usec))
540          (lambda (key . rest-args)
541            (match rest-args
542              ((_ _ _ (EINTR))
543               '(() () ()))
544              (_ (error "Unhandled error in select!" key rest-args))))))))
545   (define (get-procs-to-run)
546     (define (ports->procs ports port-map)
547       (lambda (initial-procs)
548         (fold
549          (lambda (port prev)
550            (define proc (hashq-ref port-map port))
551            ;; Now that we've selected on this port, it can be removed
552            (hashq-remove! port-map port)
553            (cons proc prev))
554          initial-procs
555          ports)))
556     (match (do-select)
557       ((read-ports write-ports except-ports) ; except-ports not used
558        ((compose (ports->procs
559                   read-ports
560                   (agenda-read-port-map agenda))
561                  (ports->procs
562                   write-ports
563                   (agenda-write-port-map agenda)))
564         '()))))
565   (define (update-agenda)
566     (let ((procs-to-run (get-procs-to-run))
567           (q (agenda-queue agenda)))
568       (for-each
569        (lambda (proc)
570          (enq! q proc))
571        procs-to-run))
572     agenda)
573   (define (ports-to-select?)
574     (define (has-items? selector)
575       ;; @@: O(n)
576       ;;    ... we could use hash-for-each and a continuation to jump
577       ;;    out with a #t at first indication of an item
578       (not (= (hash-count (const #t)
579                           (selector agenda))
580               0)))
581     (or (has-items? agenda-read-port-map)
582         (has-items? agenda-write-port-map)))
583
584   (if (or (ports-to-select?)
585           ;; select doubles as sleep...
586           (not (schedule-empty? (agenda-schedule agenda)))) 
587       (update-agenda)
588       agenda))
589
590 (define-record-type <read-request>
591   (make-read-request port proc)
592   read-request?
593   (port read-request-port)
594   (proc read-request-proc))
595
596 (define-record-type <write-request>
597   (make-write-request port proc)
598   write-request?
599   (port write-request-port)
600   (proc write-request-proc))
601
602 (define (agenda-handle-read-request! agenda read-request)
603   "Handle <read-request>, which is a request to add this port to the poll/select
604 on suspendable ports."
605   (hashq-set! (agenda-read-port-map agenda)
606               (read-request-port read-request)
607               (read-request-proc read-request)))
608
609 (define (agenda-handle-write-request! agenda write-request)
610   (hashq-set! (agenda-write-port-map agenda)
611               (write-request-port write-request)
612               (write-request-proc write-request)))
613
614 (define (stop-on-nothing-to-do agenda)
615   (and (q-empty? (agenda-queue agenda))
616        (schedule-empty? (agenda-schedule agenda))
617        (= 0 (hash-count (const #t) (agenda-read-port-map agenda)))
618        (= 0 (hash-count (const #t) (agenda-write-port-map agenda)))))
619
620
621 (define* (start-agenda agenda
622                        #:key
623                        ;; @@: Should we make stop-on-nothing-to-do
624                        ;;   the default stop-condition?
625                        (stop-condition stop-on-nothing-to-do)
626                        (get-time gettimeofday)
627                        (handle-ports update-agenda-from-select!)
628                        ;; For live hacking madness, etc
629                        (post-run-hook #f))
630   ;; TODO: Document fields
631   "Start up the AGENDA"
632   (install-suspendable-ports!)
633   (let loop ((agenda agenda))
634     (let ((agenda   
635            ;; @@: Hm, maybe here would be a great place to handle
636            ;;   select'ing on ports.
637            ;;   We could compose over agenda-run-once and agenda-read-ports
638            (agenda-run-once agenda)))
639       ;; @@: This relies on mutation at present on the queue, in the rare
640       ;;   event it's used.  If we ever switch to something more immutable,
641       ;;   it should return a new modified agenda instead.
642       (if post-run-hook
643           (post-run-hook agenda))
644       (if (and stop-condition (stop-condition agenda))
645           'done
646           (let* ((agenda
647                   ;; We have to update the time after ports handled, too
648                   ;; because it may have changed after a select
649                   (set-field
650                    (handle-ports
651                     ;; Adjust the agenda's time just in time
652                     ;; We do this here rather than in agenda-run-once to make
653                     ;; agenda-run-once's behavior fairly predictable
654                     (set-field agenda (agenda-time) (get-time)))
655                    (agenda-time) (get-time))))
656             ;; Update the agenda's current queue based on
657             ;; currently applicable time segments
658             (add-segments-contents-to-queue!
659              (schedule-extract-until! (agenda-schedule agenda) (agenda-time agenda))
660              (agenda-queue agenda))
661             (loop agenda))))))
662
663
664 (define (print-error-and-continue key . args)
665   "Frequently used as pre-unwind-handler for agenda"
666   (cond
667    ((eq? key '8sync-caught-error)
668     (match args
669       ((orig-key orig-args stacks)
670        (display "\n*** Caught async exception. ***\n")
671        (format (current-error-port)
672                "* Original key '~s and arguments: ~s *\n"
673                orig-key orig-args)
674        (display "* Caught stacks below (ending with original) *\n\n")
675        (for-each
676         (lambda (s)
677           (display-backtrace s (current-error-port))
678           (newline (current-error-port)))
679         stacks))))
680    (else
681     (format (current-error-port)
682             "\n*** Caught exception with key '~s and arguments: ~s ***\n"
683             key args)
684     (display-backtrace (make-stack #t 1 0)
685                        (current-error-port))
686     (newline (current-error-port)))))
687
688 (define-syntax-rule (maybe-catch-all (catch-handler pre-unwind-handler)
689                                      body ...)
690   (if (or catch-handler pre-unwind-handler)
691       (catch
692         #t
693         (lambda ()
694           body ...)
695         (or catch-handler (lambda _ #f))
696         (or pre-unwind-handler (lambda _ #f)))
697       (begin body ...)))
698
699 (define (wait-for-readable port)
700   (8sync-abort-to-prompt
701    (make-async-request
702     (lambda (kont)
703       (make-read-request port (wrap (kont #f)))))))
704
705 (define (wait-for-writable port)
706   (8sync-abort-to-prompt
707    (make-async-request
708     (lambda (kont)
709       (make-write-request port (wrap (kont #f)))))))
710
711 (define (agenda-run-once agenda)
712   "Run once through the agenda, and produce a new agenda
713 based on the results"
714   (define (call-proc proc)
715     (call-with-prompt
716      (agenda-prompt-tag agenda)
717      (lambda ()
718        (parameterize ((%current-agenda agenda)
719                       ;; @@: Couldn't we just parameterize this at the start of
720                       ;;   the agenda...?
721                       (current-read-waiter wait-for-readable)
722                       (current-write-waiter wait-for-writable))
723          (maybe-catch-all
724           ((agenda-catch-handler agenda)
725            (agenda-pre-unwind-handler agenda))
726           (proc))))
727      (lambda (kont async-request)
728        (setup-async-request kont async-request))))
729
730   (let ((queue (agenda-queue agenda))
731         (next-queue (make-q)))
732     (while (not (q-empty? queue))
733       (let* ((proc (q-pop! queue))
734              (proc-result (call-proc proc))
735              (enqueue
736               (lambda (run-request)
737                 (define (schedule-at! time proc)
738                   (schedule-add! (agenda-schedule agenda) time proc))
739                 (let ((request-time (run-request-when run-request)))
740                   (match request-time
741                     ((? time-delta? time-delta)
742                      (let ((time (time-delta+ (agenda-time agenda)
743                                               time-delta)))
744                        (schedule-at! time (run-request-proc run-request))))
745                     ((? integer? sec)
746                      (let ((time (cons sec 0)))
747                        (schedule-at! time (run-request-proc run-request))))
748                     (((? integer? sec) . (? integer? usec))
749                      (schedule-at! request-time (run-request-proc run-request)))
750                     (#f
751                      (enq! next-queue (run-request-proc run-request))))))))
752         (define (handle-individual result)
753           ;; @@: Could maybe optimize by switching to an explicit cond...
754           (match result
755             ((? run-request? new-proc)
756              (enqueue new-proc))
757             ((? read-request? read-request)
758              (agenda-handle-read-request! agenda read-request))
759             ((? write-request? write-request)
760              (agenda-handle-write-request! agenda write-request))
761             ;; do nothing
762             ;; Remember, we don't throw an error here because procedures can
763             ;; return a run request, eg with run-it, at the end of their
764             ;; evaluation to keep looping.
765             ;; @@: Though is this really a useful feature?
766             (_ #f)))
767         ;; @@: We might support delay-wrapped procedures here
768         (match proc-result
769           ((results ...)
770            (for-each handle-individual results))
771           (one-result (handle-individual one-result)))))
772     ;; TODO: Alternately, we could return the next-queue
773     ;;   along with changes to be added to the schedule here?
774     ;; Return new agenda, with next queue set
775     (set-field agenda (agenda-queue) next-queue)))