agenda: Fixing exports.
[8sync.git] / 8sync / agenda.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright (C) 2015, 2016 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync agenda)
20   #:use-module (srfi srfi-1)
21   #:use-module (srfi srfi-9)
22   #:use-module (srfi srfi-9 gnu)
23   #:use-module (ice-9 q)
24   #:use-module (ice-9 match)
25   #:use-module (ice-9 receive)
26   #:use-module (ice-9 suspendable-ports)
27   #:export (<agenda>
28             make-agenda agenda?
29             agenda-queue agenda-prompt-tag
30             agenda-read-port-map agenda-write-port-map
31             agenda-schedule
32             
33             make-async-prompt-tag
34
35             list->q make-q*
36
37             <time-segment>
38             make-time-segment time-segment?
39             time-segment-time time-segment-queue
40
41             time< time= time<= time-delta+
42             time-minus time-plus
43
44             <time-delta>
45             make-time-delta tdelta time-delta?
46             time-delta-sec time-delta-usec
47
48             <schedule>
49             make-schedule schedule?
50             schedule-add! schedule-empty?
51             schedule-segments
52             schedule-soonest-time
53
54             schedule-segments-split schedule-extract-until!
55             add-segments-contents-to-queue!
56
57             <run-request>
58             make-run-request run-request?
59             run-request-proc run-request-when
60
61             run-it wrap wrap-apply run run-at run-delay
62
63             8sync
64             8sleep 8usleep
65             
66             ;; used for introspecting the error, but a method for making
67             ;; is not exposed
68             wrapped-exception?
69             wrapped-exception-key wrapped-exception-args
70             wrapped-exception-stacks
71
72             print-error-and-continue
73
74             stop-on-nothing-to-do
75
76             %current-agenda
77             start-agenda agenda-run-once))
78
79 ;; @@: Using immutable agendas here, so wouldn't it make sense to
80 ;;   replace this queue stuff with using pfds based immutable queues?
81
82 \f
83 ;;; Agenda definition
84 ;;; =================
85
86 ;;; The agenda consists of:
87 ;;;  - a queue of immediate items to handle
88 ;;;  - sheduled future events to be added to a future queue
89 ;;;  - a tag by which running processes can escape for some asynchronous
90 ;;;    operation (from which they can be returned later)
91 ;;;  - a mapping of ports to various handler procedures
92 ;;;
93 ;;; The goal, eventually, is for this all to be immutable and functional.
94 ;;; However, we aren't there yet.  Some tricky things:
95 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
96 ;;;  - Need to use immutable queues (ijp's pfds library?)
97 ;;;  - Modeling reading from ports as something repeatable,
98 ;;;    and with reasonable separation from functional components?
99
100 (define-immutable-record-type <agenda>
101   (make-agenda-intern queue prompt-tag
102                       read-port-map write-port-map
103                       schedule time catch-handler pre-unwind-handler)
104   agenda?
105   (queue agenda-queue)
106   (prompt-tag agenda-prompt-tag)
107   (read-port-map agenda-read-port-map)
108   (write-port-map agenda-write-port-map)
109   (schedule agenda-schedule)
110   (time agenda-time)
111   (catch-handler agenda-catch-handler)
112   (pre-unwind-handler agenda-pre-unwind-handler))
113
114 (define (make-async-prompt-tag)
115   "Make an async prompt tag for an agenda.
116
117 Generally done automatically for the user through (make-agenda)."
118   (make-prompt-tag "prompt"))
119
120 (define* (make-agenda #:key
121                       (queue (make-q))
122                       (prompt (make-prompt-tag))
123                       (read-port-map (make-hash-table))
124                       (write-port-map (make-hash-table))
125                       (schedule (make-schedule))
126                       (time (gettimeofday))
127                       (catch-handler #f)
128                       (pre-unwind-handler print-error-and-continue))
129   ;; TODO: document arguments
130   "Make a fresh agenda."
131   (make-agenda-intern queue prompt
132                       read-port-map write-port-map
133                       schedule time
134                       catch-handler pre-unwind-handler))
135
136 (define (current-agenda-prompt)
137   "Get the prompt for the current agenda; signal an error if there isn't one."
138   (let ((current-agenda (%current-agenda)))
139     (if (not current-agenda)
140         (throw
141          'no-current-agenda
142          "Can't get current agenda prompt if there's no current agenda!")
143         (agenda-prompt-tag current-agenda))))
144
145 ;; helper for making queues for an agenda
146 (define (list->q lst)
147   "Makes a queue composed of LST items"
148   (let ((q (make-q)))
149     (for-each
150      (lambda (x)
151        (enq! q x))
152      lst)
153     q))
154
155 (define (make-q* . args)
156   "Makes a queue and populates it with this invocation's ARGS"
157   (list->q args))
158
159 \f
160 ;;; Schedule
161 ;;; ========
162
163 ;;; This is where we handle timed events for the future
164
165 ;; This section totally borrows from the ideas in SICP
166 ;; <3 <3 <3
167
168 ;; NOTE: time is a cons of (seconds . microseconds)
169
170 (define-record-type <time-segment>
171   (make-time-segment-intern time queue)
172   time-segment?
173   (time time-segment-time)
174   (queue time-segment-queue))
175
176 ;; @@: This seems to be the same as srfi-18's seconds->time procedure?
177 ;;   Maybe double check and switch to that?  (Thanks amz3!)
178
179 (define (time-from-float-or-fraction time)
180   "Produce a (sec . usec) pair from TIME, a float or fraction"
181   (let* ((mixed-whole (floor time))
182          (mixed-rest (- time mixed-whole))  ; float or fraction component
183          (sec mixed-whole)
184          (usec (floor (* 1000000 mixed-rest))))
185     (cons (inexact->exact sec) (inexact->exact usec))))
186
187 (define (time-segment-right-format time)
188   "Ensure TIME is in the right format.
189
190 The right format means (second . microsecond).
191 If an integer, will convert appropriately."
192   ;; TODO: add floating point / rational number support.
193   (match time
194     ;; time is already a cons of second and microsecnd
195     (((? integer? s) . (? integer? u)) time)
196     ;; time was just an integer (just the second)
197     ((? integer? _) (cons time 0))
198     ((or (? rational? _) (? inexact? _))
199      (time-from-float-or-fraction time))
200     (_ (throw 'invalid-time "Invalid time" time))))
201
202 (define* (make-time-segment time #:optional (queue (make-q)))
203   "Make a time segment of TIME and QUEUE
204
205 No automatic conversion is done, so you might have to
206 run (time-segment-right-format) first."
207   (make-time-segment-intern time queue))
208
209 (define (time< time1 time2)
210   "Check if TIME1 is less than TIME2"
211   (cond ((< (car time1)
212             (car time2))
213          #t)
214         ((and (= (car time1)
215                  (car time2))
216               (< (cdr time1)
217                  (cdr time2)))
218          #t)
219         (else #f)))
220
221 (define (time= time1 time2)
222   "Check whether TIME1 and TIME2 are equivalent"
223   (and (= (car time1) (car time2))
224        (= (cdr time1) (cdr time2))))
225
226 (define (time<= time1 time2)
227   "Check if TIME1 is less than or equal to TIME2"
228   (or (time< time1 time2)
229       (time= time1 time2)))
230
231
232 (define-record-type <time-delta>
233   (make-time-delta-intern sec usec)
234   time-delta?
235   (sec time-delta-sec)
236   (usec time-delta-usec))
237
238 (define* (make-time-delta time)
239   "Make a <time-delta> of SEC seconds and USEC microseconds.
240
241 This is used primarily so the agenda can recognize RUN-REQUEST objects
242 which are meant to delay computation"
243   (match (time-segment-right-format time)
244     ((sec . usec)
245      (make-time-delta-intern sec usec))))
246
247 (define tdelta make-time-delta)
248
249 (define (time-carry-correct time)
250   "Corrects/handles time microsecond carry.
251 Will produce (0 . 0) instead of a negative number, if needed."
252   (cond ((>= (cdr time) 1000000)
253          (cons
254           (+ (car time) 1)
255           (- (cdr time) 1000000)))
256         ((< (cdr time) 0)
257          (if (= (car time) 0)
258              '(0 0)
259              (cons
260               (- (car time) 1)
261               (+ (cdr time) 1000000))))
262         (else time)))
263
264 (define (time-delta+ time time-delta)
265   "Increment a TIME by the value of TIME-DELTA"
266   (time-carry-correct
267    (cons (+ (car time) (time-delta-sec time-delta))
268          (+ (cdr time) (time-delta-usec time-delta)))))
269
270 (define (time-minus time1 time2)
271   "Subtract TIME2 from TIME1"
272   (time-carry-correct
273    (cons (- (car time1) (car time2))
274          (- (cdr time1) (cdr time2)))))
275
276 (define (time-plus time1 time2)
277   "Add TIME1 and TIME2"
278   (time-carry-correct
279    (cons (+ (car time1) (car time2))
280          (+ (cdr time1) (cdr time2)))))
281
282
283 (define-record-type <schedule>
284   (make-schedule-intern segments)
285   schedule?
286   (segments schedule-segments set-schedule-segments!))
287
288 (define* (make-schedule #:optional segments)
289   "Make a schedule, optionally pre-composed of SEGMENTS"
290   (make-schedule-intern (or segments '())))
291
292 (define (schedule-soonest-time schedule)
293   "Return a cons of (sec . usec) for next time segement, or #f if none"
294   (let ((segments (schedule-segments schedule)))
295     (if (eq? segments '())
296         #f
297         (time-segment-time (car segments)))))
298
299 ;; TODO: This code is reasonably easy to read but it
300 ;;   mutates AND is worst case of O(n) in both space and time :(
301 ;;   but at least it'll be reasonably easy to refactor to
302 ;;   a more functional setup?
303 (define (schedule-add! schedule time proc)
304   "Mutate SCHEDULE, adding PROC at an appropriate time segment for TIME"
305   (let ((time (time-segment-right-format time)))
306     (define (new-time-segment)
307       (let ((new-segment
308              (make-time-segment time)))
309         (enq! (time-segment-queue new-segment) proc)
310         new-segment))
311     (define (loop segments)
312       (define (segment-equals-time? segment)
313         (time= time (time-segment-time segment)))
314
315       (define (segment-more-than-time? segment)
316         (time< time (time-segment-time segment)))
317
318       ;; We could switch this out to be more mutate'y
319       ;; and avoid the O(n) of space... is that over-optimizing?
320       (match segments
321         ;; If we're at the end of the list, time to make a new
322         ;; segment...
323         ('() (cons (new-time-segment) '()))
324         ;; If the segment's time is exactly our time, good news
325         ;; everyone!  Let's append our stuff to its queue
326         (((? segment-equals-time? first) rest ...)
327          (enq! (time-segment-queue first) proc)
328          segments)
329         ;; If the first segment is more than our time,
330         ;; ours belongs before this one, so add it and
331         ;; start consing our way back
332         (((? segment-more-than-time? first) rest ...)
333          (cons (new-time-segment) segments))
334         ;; Otherwise, build up recursive result
335         ((first rest ... )
336          (cons first (loop rest)))))
337     (set-schedule-segments!
338      schedule
339      (loop (schedule-segments schedule)))))
340
341 (define (schedule-empty? schedule)
342   "Check if the SCHEDULE is currently empty"
343   (eq? (schedule-segments schedule) '()))
344
345 (define (schedule-segments-split schedule time)
346   "Does a multiple value return of time segments before/at and after TIME"
347   (let ((time (time-segment-right-format time)))
348     (define (segment-is-now? segment)
349       (time= (time-segment-time segment) time))
350     (define (segment-is-before-now? segment)
351       (time< (time-segment-time segment) time))
352
353     (let loop ((segments-before '())
354                (segments-left (schedule-segments schedule)))
355       (match segments-left
356         ;; end of the line, return
357         ('()
358          (values (reverse segments-before) '()))
359
360         ;; It's right now, so time to stop, but include this one in before
361         ;; but otherwise return
362         (((? segment-is-now? first) rest ...)
363          (values (reverse (cons first segments-before)) rest))
364
365         ;; This is prior or at now, so add it and keep going
366         (((? segment-is-before-now? first) rest ...)
367          (loop (cons first segments-before) rest))
368
369         ;; Otherwise it's past now, just return what we have
370         (segments-after
371          (values segments-before segments-after))))))
372
373 (define (schedule-extract-until! schedule time)
374   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
375   (receive (segments-before segments-after)
376       (schedule-segments-split schedule time)
377     (set-schedule-segments! schedule segments-after)
378     segments-before))
379
380 (define (add-segments-contents-to-queue! segments queue)
381   (for-each
382    (lambda (segment)
383      (let ((seg-queue (time-segment-queue segment)))
384        (while (not (q-empty? seg-queue))
385          (enq! queue (deq! seg-queue)))))
386    segments))
387
388
389 \f
390 ;;; Request to run stuff
391 ;;; ====================
392
393 (define-record-type <run-request>
394   (make-run-request proc when)
395   run-request?
396   (proc run-request-proc)
397   (when run-request-when))
398
399 (define* (run-it proc #:optional when)
400   "Make a request to run PROC (possibly at WHEN)"
401   (make-run-request proc when))
402
403 (define-syntax-rule (wrap body ...)
404   "Wrap contents in a procedure"
405   (lambda ()
406     body ...))
407
408 (define-syntax-rule (wrap-apply body)
409   "Wrap possibly multi-value function in a procedure, applies all arguments"
410   (lambda args
411     (apply body args)))
412
413
414 ;; @@: Do we really want `body ...' here?
415 ;;   what about just `body'?
416 (define-syntax-rule (run body ...)
417   "Run everything in BODY but wrap in a convenient procedure"
418   (make-run-request (wrap body ...) #f))
419
420 (define-syntax-rule (run-at body ... when)
421   "Run BODY at WHEN"
422   (make-run-request (wrap body ...) when))
423
424 ;; @@: Is it okay to overload the term "delay" like this?
425 ;;   Would `run-in' be better?
426 (define-syntax-rule (run-delay body ... delay-time)
427   "Run BODY at DELAY-TIME time from now"
428   (make-run-request (wrap body ...) (tdelta delay-time)))
429
430
431 \f
432 ;;; Asynchronous escape to run things
433 ;;; =================================
434
435 (define-syntax-rule (8sync-abort-to-prompt async-request)
436   (abort-to-prompt (current-agenda-prompt)
437                    async-request))
438
439 ;; Async port request and run-request meta-requests
440 (define (make-async-request proc)
441   "Wrap PROC in an async-request
442
443 The purpose of this is to make sure that users don't accidentally
444 return the wrong thing via (8sync) and trip themselves up."
445   (cons '*async-request* proc))
446
447 (define (setup-async-request resume-kont async-request)
448   "Complete an async request for agenda-run-once's continuation handling"
449   (match async-request
450     (('*async-request* . async-setup-proc)
451      (async-setup-proc resume-kont))
452     ;; TODO: deliver more helpful errors depending on what the user
453     ;;   returned
454     (_ (throw 'invalid-async-request
455               "Invalid request passed back via an (8sync) procedure."
456               async-request))))
457
458 (define-syntax-rule (8sync body ...)
459   "Run body asynchronously but ignore its result...
460 forge ahead in our current function!"
461   (8sync-abort-to-prompt
462    (make-async-request
463     (lambda (kont)
464       (list (make-run-request
465              ;; What's with returning #f to kont?
466              ;; Otherwise we sometimes get errors like
467              ;; "Zero values returned to single-valued continuation""
468              (wrap (kont #f)) #f)
469             (make-run-request (lambda () body ...) #f))))))
470
471 ;; TODO: Rewrite when we move to this being just `sleep'.
472 (define (8sleep secs)
473   "Like sleep, but asynchronous."
474   (8sync-abort-to-prompt
475    (make-async-request
476     (lambda (kont)
477       (make-run-request (lambda () (kont #f)) (tdelta secs))))))
478
479 (define (8usleep usecs)
480   "Like usleep, but asynchronous."
481   (define (usecs->time-pair)
482     (if (< 1000000)
483         (cons 0 usecs)
484         (let* ((sec (floor (/ usecs 1000000)))
485                (msec (- usecs (* sec 1000000))))
486           (cons sec msec))))
487   (8sync-abort-to-prompt
488    (make-async-request
489     (lambda (kont)
490       (make-run-request (lambda () (kont #f)) (tdelta usecs->time-pair))))))
491
492 ;; Voluntarily yield execution
493 (define (yield)  ; @@: should this be define-inlinable?
494   "Voluntarily yield execution to the scheduler."
495   (8sync-abort-to-prompt
496    (make-async-request
497     (lambda (kont)
498       (make-run-request (lambda () (kont #f)) #f)))))
499
500 \f
501 ;;; Execution of agenda, and current agenda
502 ;;; =======================================
503
504 (define %current-agenda (make-parameter #f))
505
506 (define (update-agenda-from-select! agenda)
507   "Potentially (select) on ports specified in agenda, adding items to queue.
508
509 Also handles sleeping when all we have to do is wait on the schedule."
510   (define (hash-keys hash)
511     (hash-map->list (lambda (k v) k) hash))
512   (define (get-wait-time)
513     ;; TODO: we need to figure this out based on whether there's anything
514     ;;   in the queue, and if not, how long till the next scheduled item
515     (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
516       (cond 
517        ((not (q-empty? (agenda-queue agenda)))
518         (cons 0 0))
519        (soonest-time    ; ie, the agenda is non-empty
520         (let* ((current-time (agenda-time agenda)))
521           (if (time<= soonest-time current-time)
522               ;; Well there's something due so let's select
523               ;; (this avoids a (possible?) race condition chance)
524               (cons 0 0)
525               (time-minus soonest-time current-time))))
526        (else
527         (cons #f #f)))))
528   (define (do-select)
529     ;; TODO: support usecond wait time too
530     (match (get-wait-time)
531       ((sec . usec)
532        (catch 'system-error
533          (lambda ()
534            (select (hash-keys (agenda-read-port-map agenda))
535                    (hash-keys (agenda-write-port-map agenda))
536                    '()
537                    sec usec))
538          (lambda (key . rest-args)
539            (match rest-args
540              ((_ _ _ (EINTR))
541               '(() () ()))
542              (_ (error "Unhandled error in select!" key rest-args))))))))
543   (define (get-procs-to-run)
544     (define (ports->procs ports port-map)
545       (lambda (initial-procs)
546         (fold
547          (lambda (port prev)
548            (define proc (hashq-ref port-map port))
549            ;; Now that we've selected on this port, it can be removed
550            (hashq-remove! port-map port)
551            (cons proc prev))
552          initial-procs
553          ports)))
554     (match (do-select)
555       ((read-ports write-ports except-ports) ; except-ports not used
556        ((compose (ports->procs
557                   read-ports
558                   (agenda-read-port-map agenda))
559                  (ports->procs
560                   write-ports
561                   (agenda-write-port-map agenda)))
562         '()))))
563   (define (update-agenda)
564     (let ((procs-to-run (get-procs-to-run))
565           (q (agenda-queue agenda)))
566       (for-each
567        (lambda (proc)
568          (enq! q proc))
569        procs-to-run))
570     agenda)
571   (define (ports-to-select?)
572     (define (has-items? selector)
573       ;; @@: O(n)
574       ;;    ... we could use hash-for-each and a continuation to jump
575       ;;    out with a #t at first indication of an item
576       (not (= (hash-count (const #t)
577                           (selector agenda))
578               0)))
579     (or (has-items? agenda-read-port-map)
580         (has-items? agenda-write-port-map)))
581
582   (if (or (ports-to-select?)
583           ;; select doubles as sleep...
584           (not (schedule-empty? (agenda-schedule agenda)))) 
585       (update-agenda)
586       agenda))
587
588 (define-record-type <read-request>
589   (make-read-request port proc)
590   read-request?
591   (port read-request-port)
592   (proc read-request-proc))
593
594 (define-record-type <write-request>
595   (make-write-request port proc)
596   write-request?
597   (port write-request-port)
598   (proc write-request-proc))
599
600 (define (agenda-handle-read-request! agenda read-request)
601   "Handle <read-request>, which is a request to add this port to the poll/select
602 on suspendable ports."
603   (hashq-set! (agenda-read-port-map agenda)
604               (read-request-port read-request)
605               (read-request-proc read-request)))
606
607 (define (agenda-handle-write-request! agenda write-request)
608   (hashq-set! (agenda-write-port-map agenda)
609               (write-request-port write-request)
610               (write-request-proc write-request)))
611
612 (define (stop-on-nothing-to-do agenda)
613   (and (q-empty? (agenda-queue agenda))
614        (schedule-empty? (agenda-schedule agenda))
615        (= 0 (hash-count (const #t) (agenda-read-port-map agenda)))
616        (= 0 (hash-count (const #t) (agenda-write-port-map agenda)))))
617
618
619 (define* (start-agenda agenda
620                        #:key
621                        ;; @@: Should we make stop-on-nothing-to-do
622                        ;;   the default stop-condition?
623                        (stop-condition stop-on-nothing-to-do)
624                        (get-time gettimeofday)
625                        (handle-ports update-agenda-from-select!)
626                        ;; For live hacking madness, etc
627                        (post-run-hook #f))
628   ;; TODO: Document fields
629   "Start up the AGENDA"
630   (install-suspendable-ports!)
631   (let loop ((agenda agenda))
632     (let ((agenda   
633            ;; @@: Hm, maybe here would be a great place to handle
634            ;;   select'ing on ports.
635            ;;   We could compose over agenda-run-once and agenda-read-ports
636            (agenda-run-once agenda)))
637       ;; @@: This relies on mutation at present on the queue, in the rare
638       ;;   event it's used.  If we ever switch to something more immutable,
639       ;;   it should return a new modified agenda instead.
640       (if post-run-hook
641           (post-run-hook agenda))
642       (if (and stop-condition (stop-condition agenda))
643           'done
644           (let* ((agenda
645                   ;; We have to update the time after ports handled, too
646                   ;; because it may have changed after a select
647                   (set-field
648                    (handle-ports
649                     ;; Adjust the agenda's time just in time
650                     ;; We do this here rather than in agenda-run-once to make
651                     ;; agenda-run-once's behavior fairly predictable
652                     (set-field agenda (agenda-time) (get-time)))
653                    (agenda-time) (get-time))))
654             ;; Update the agenda's current queue based on
655             ;; currently applicable time segments
656             (add-segments-contents-to-queue!
657              (schedule-extract-until! (agenda-schedule agenda) (agenda-time agenda))
658              (agenda-queue agenda))
659             (loop agenda))))))
660
661
662 (define (print-error-and-continue key . args)
663   "Frequently used as pre-unwind-handler for agenda"
664   (cond
665    ((eq? key '8sync-caught-error)
666     (match args
667       ((orig-key orig-args stacks)
668        (display "\n*** Caught async exception. ***\n")
669        (format (current-error-port)
670                "* Original key '~s and arguments: ~s *\n"
671                orig-key orig-args)
672        (display "* Caught stacks below (ending with original) *\n\n")
673        (for-each
674         (lambda (s)
675           (display-backtrace s (current-error-port))
676           (newline (current-error-port)))
677         stacks))))
678    (else
679     (format (current-error-port)
680             "\n*** Caught exception with key '~s and arguments: ~s ***\n"
681             key args)
682     (display-backtrace (make-stack #t 1 0)
683                        (current-error-port))
684     (newline (current-error-port)))))
685
686 (define-syntax-rule (maybe-catch-all (catch-handler pre-unwind-handler)
687                                      body ...)
688   (if (or catch-handler pre-unwind-handler)
689       (catch
690         #t
691         (lambda ()
692           body ...)
693         (or catch-handler (lambda _ #f))
694         (or pre-unwind-handler (lambda _ #f)))
695       (begin body ...)))
696
697 (define (wait-for-readable port)
698   (8sync-abort-to-prompt
699    (make-async-request
700     (lambda (kont)
701       (make-read-request port (wrap (kont #f)))))))
702
703 (define (wait-for-writable port)
704   (8sync-abort-to-prompt
705    (make-async-request
706     (lambda (kont)
707       (make-write-request port (wrap (kont #f)))))))
708
709 (define (agenda-run-once agenda)
710   "Run once through the agenda, and produce a new agenda
711 based on the results"
712   (define (call-proc proc)
713     (call-with-prompt
714      (agenda-prompt-tag agenda)
715      (lambda ()
716        (parameterize ((%current-agenda agenda)
717                       ;; @@: Couldn't we just parameterize this at the start of
718                       ;;   the agenda...?
719                       (current-read-waiter wait-for-readable)
720                       (current-write-waiter wait-for-writable))
721          (maybe-catch-all
722           ((agenda-catch-handler agenda)
723            (agenda-pre-unwind-handler agenda))
724           (proc))))
725      (lambda (kont async-request)
726        (setup-async-request kont async-request))))
727
728   (let ((queue (agenda-queue agenda))
729         (next-queue (make-q)))
730     (while (not (q-empty? queue))
731       (let* ((proc (q-pop! queue))
732              (proc-result (call-proc proc))
733              (enqueue
734               (lambda (run-request)
735                 (define (schedule-at! time proc)
736                   (schedule-add! (agenda-schedule agenda) time proc))
737                 (let ((request-time (run-request-when run-request)))
738                   (match request-time
739                     ((? time-delta? time-delta)
740                      (let ((time (time-delta+ (agenda-time agenda)
741                                               time-delta)))
742                        (schedule-at! time (run-request-proc run-request))))
743                     ((? integer? sec)
744                      (let ((time (cons sec 0)))
745                        (schedule-at! time (run-request-proc run-request))))
746                     (((? integer? sec) . (? integer? usec))
747                      (schedule-at! request-time (run-request-proc run-request)))
748                     (#f
749                      (enq! next-queue (run-request-proc run-request))))))))
750         (define (handle-individual result)
751           ;; @@: Could maybe optimize by switching to an explicit cond...
752           (match result
753             ((? run-request? new-proc)
754              (enqueue new-proc))
755             ((? read-request? read-request)
756              (agenda-handle-read-request! agenda read-request))
757             ((? write-request? write-request)
758              (agenda-handle-write-request! agenda write-request))
759             ;; do nothing
760             ;; Remember, we don't throw an error here because procedures can
761             ;; return a run request, eg with run-it, at the end of their
762             ;; evaluation to keep looping.
763             ;; @@: Though is this really a useful feature?
764             (_ #f)))
765         ;; @@: We might support delay-wrapped procedures here
766         (match proc-result
767           ((results ...)
768            (for-each handle-individual results))
769           (one-result (handle-individual one-result)))))
770     ;; TODO: Alternately, we could return the next-queue
771     ;;   along with changes to be added to the schedule here?
772     ;; Return new agenda, with next queue set
773     (set-field agenda (agenda-queue) next-queue)))