agenda: Add yield procedure.
[8sync.git] / 8sync / agenda.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright (C) 2015, 2016 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync agenda)
20   #:use-module (srfi srfi-1)
21   #:use-module (srfi srfi-9)
22   #:use-module (srfi srfi-9 gnu)
23   #:use-module (ice-9 q)
24   #:use-module (ice-9 match)
25   #:use-module (ice-9 receive)
26   #:use-module (ice-9 suspendable-ports)
27   #:export (<agenda>
28             make-agenda agenda?
29             agenda-queue agenda-prompt-tag
30             agenda-read-port-map agenda-write-port-map
31             agenda-schedule
32             
33             make-async-prompt-tag
34
35             list->q make-q*
36
37             <time-segment>
38             make-time-segment time-segment?
39             time-segment-time time-segment-queue
40
41             time< time= time<= time-delta+
42             time-minus time-plus
43
44             <time-delta>
45             make-time-delta tdelta time-delta?
46             time-delta-sec time-delta-usec
47
48             <schedule>
49             make-schedule schedule?
50             schedule-add! schedule-empty?
51             schedule-segments
52             schedule-soonest-time
53
54             schedule-segments-split schedule-extract-until!
55             add-segments-contents-to-queue!
56
57             <run-request>
58             make-run-request run-request?
59             run-request-proc run-request-when
60
61             run-it wrap wrap-apply run run-at run-delay
62
63             8sync-delay
64             8sync-run 8sync-run-at 8sync-run-delay
65             8sync
66             8sleep
67             
68             ;; used for introspecting the error, but a method for making
69             ;; is not exposed
70             wrapped-exception?
71             wrapped-exception-key wrapped-exception-args
72             wrapped-exception-stacks
73
74             print-error-and-continue
75
76             stop-on-nothing-to-do
77
78             %current-agenda
79             start-agenda agenda-run-once))
80
81 (install-suspendable-ports!)
82
83 ;; @@: Using immutable agendas here, so wouldn't it make sense to
84 ;;   replace this queue stuff with using pfds based immutable queues?
85
86 \f
87 ;;; Agenda definition
88 ;;; =================
89
90 ;;; The agenda consists of:
91 ;;;  - a queue of immediate items to handle
92 ;;;  - sheduled future events to be added to a future queue
93 ;;;  - a tag by which running processes can escape for some asynchronous
94 ;;;    operation (from which they can be returned later)
95 ;;;  - a mapping of ports to various handler procedures
96 ;;;
97 ;;; The goal, eventually, is for this all to be immutable and functional.
98 ;;; However, we aren't there yet.  Some tricky things:
99 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
100 ;;;  - Need to use immutable queues (ijp's pfds library?)
101 ;;;  - Modeling reading from ports as something repeatable,
102 ;;;    and with reasonable separation from functional components?
103
104 (define-immutable-record-type <agenda>
105   (make-agenda-intern queue prompt-tag
106                       read-port-map write-port-map
107                       schedule time catch-handler pre-unwind-handler)
108   agenda?
109   (queue agenda-queue)
110   (prompt-tag agenda-prompt-tag)
111   (read-port-map agenda-read-port-map)
112   (write-port-map agenda-write-port-map)
113   (schedule agenda-schedule)
114   (time agenda-time)
115   (catch-handler agenda-catch-handler)
116   (pre-unwind-handler agenda-pre-unwind-handler))
117
118 (define (make-async-prompt-tag)
119   "Make an async prompt tag for an agenda.
120
121 Generally done automatically for the user through (make-agenda)."
122   (make-prompt-tag "prompt"))
123
124 (define* (make-agenda #:key
125                       (queue (make-q))
126                       (prompt (make-prompt-tag))
127                       (read-port-map (make-hash-table))
128                       (write-port-map (make-hash-table))
129                       (schedule (make-schedule))
130                       (time (gettimeofday))
131                       (catch-handler #f)
132                       (pre-unwind-handler print-error-and-continue))
133   ;; TODO: document arguments
134   "Make a fresh agenda."
135   (make-agenda-intern queue prompt
136                       read-port-map write-port-map
137                       schedule time
138                       catch-handler pre-unwind-handler))
139
140 (define (current-agenda-prompt)
141   "Get the prompt for the current agenda; signal an error if there isn't one."
142   (let ((current-agenda (%current-agenda)))
143     (if (not current-agenda)
144         (throw
145          'no-current-agenda
146          "Can't get current agenda prompt if there's no current agenda!")
147         (agenda-prompt-tag current-agenda))))
148
149 ;; helper for making queues for an agenda
150 (define (list->q lst)
151   "Makes a queue composed of LST items"
152   (let ((q (make-q)))
153     (for-each
154      (lambda (x)
155        (enq! q x))
156      lst)
157     q))
158
159 (define (make-q* . args)
160   "Makes a queue and populates it with this invocation's ARGS"
161   (list->q args))
162
163 \f
164 ;;; Schedule
165 ;;; ========
166
167 ;;; This is where we handle timed events for the future
168
169 ;; This section totally borrows from the ideas in SICP
170 ;; <3 <3 <3
171
172 ;; NOTE: time is a cons of (seconds . microseconds)
173
174 (define-record-type <time-segment>
175   (make-time-segment-intern time queue)
176   time-segment?
177   (time time-segment-time)
178   (queue time-segment-queue))
179
180 ;; @@: This seems to be the same as srfi-18's seconds->time procedure?
181 ;;   Maybe double check and switch to that?  (Thanks amz3!)
182
183 (define (time-from-float-or-fraction time)
184   "Produce a (sec . usec) pair from TIME, a float or fraction"
185   (let* ((mixed-whole (floor time))
186          (mixed-rest (- time mixed-whole))  ; float or fraction component
187          (sec mixed-whole)
188          (usec (floor (* 1000000 mixed-rest))))
189     (cons (inexact->exact sec) (inexact->exact usec))))
190
191 (define (time-segment-right-format time)
192   "Ensure TIME is in the right format.
193
194 The right format means (second . microsecond).
195 If an integer, will convert appropriately."
196   ;; TODO: add floating point / rational number support.
197   (match time
198     ;; time is already a cons of second and microsecnd
199     (((? integer? s) . (? integer? u)) time)
200     ;; time was just an integer (just the second)
201     ((? integer? _) (cons time 0))
202     ((or (? rational? _) (? inexact? _))
203      (time-from-float-or-fraction time))
204     (_ (throw 'invalid-time "Invalid time" time))))
205
206 (define* (make-time-segment time #:optional (queue (make-q)))
207   "Make a time segment of TIME and QUEUE
208
209 No automatic conversion is done, so you might have to
210 run (time-segment-right-format) first."
211   (make-time-segment-intern time queue))
212
213 (define (time< time1 time2)
214   "Check if TIME1 is less than TIME2"
215   (cond ((< (car time1)
216             (car time2))
217          #t)
218         ((and (= (car time1)
219                  (car time2))
220               (< (cdr time1)
221                  (cdr time2)))
222          #t)
223         (else #f)))
224
225 (define (time= time1 time2)
226   "Check whether TIME1 and TIME2 are equivalent"
227   (and (= (car time1) (car time2))
228        (= (cdr time1) (cdr time2))))
229
230 (define (time<= time1 time2)
231   "Check if TIME1 is less than or equal to TIME2"
232   (or (time< time1 time2)
233       (time= time1 time2)))
234
235
236 (define-record-type <time-delta>
237   (make-time-delta-intern sec usec)
238   time-delta?
239   (sec time-delta-sec)
240   (usec time-delta-usec))
241
242 (define* (make-time-delta time)
243   "Make a <time-delta> of SEC seconds and USEC microseconds.
244
245 This is used primarily so the agenda can recognize RUN-REQUEST objects
246 which are meant to delay computation"
247   (match (time-segment-right-format time)
248     ((sec . usec)
249      (make-time-delta-intern sec usec))))
250
251 (define tdelta make-time-delta)
252
253 (define (time-carry-correct time)
254   "Corrects/handles time microsecond carry.
255 Will produce (0 . 0) instead of a negative number, if needed."
256   (cond ((>= (cdr time) 1000000)
257          (cons
258           (+ (car time) 1)
259           (- (cdr time) 1000000)))
260         ((< (cdr time) 0)
261          (if (= (car time) 0)
262              '(0 0)
263              (cons
264               (- (car time) 1)
265               (+ (cdr time) 1000000))))
266         (else time)))
267
268 (define (time-delta+ time time-delta)
269   "Increment a TIME by the value of TIME-DELTA"
270   (time-carry-correct
271    (cons (+ (car time) (time-delta-sec time-delta))
272          (+ (cdr time) (time-delta-usec time-delta)))))
273
274 (define (time-minus time1 time2)
275   "Subtract TIME2 from TIME1"
276   (time-carry-correct
277    (cons (- (car time1) (car time2))
278          (- (cdr time1) (cdr time2)))))
279
280 (define (time-plus time1 time2)
281   "Add TIME1 and TIME2"
282   (time-carry-correct
283    (cons (+ (car time1) (car time2))
284          (+ (cdr time1) (cdr time2)))))
285
286
287 (define-record-type <schedule>
288   (make-schedule-intern segments)
289   schedule?
290   (segments schedule-segments set-schedule-segments!))
291
292 (define* (make-schedule #:optional segments)
293   "Make a schedule, optionally pre-composed of SEGMENTS"
294   (make-schedule-intern (or segments '())))
295
296 (define (schedule-soonest-time schedule)
297   "Return a cons of (sec . usec) for next time segement, or #f if none"
298   (let ((segments (schedule-segments schedule)))
299     (if (eq? segments '())
300         #f
301         (time-segment-time (car segments)))))
302
303 ;; TODO: This code is reasonably easy to read but it
304 ;;   mutates AND is worst case of O(n) in both space and time :(
305 ;;   but at least it'll be reasonably easy to refactor to
306 ;;   a more functional setup?
307 (define (schedule-add! schedule time proc)
308   "Mutate SCHEDULE, adding PROC at an appropriate time segment for TIME"
309   (let ((time (time-segment-right-format time)))
310     (define (new-time-segment)
311       (let ((new-segment
312              (make-time-segment time)))
313         (enq! (time-segment-queue new-segment) proc)
314         new-segment))
315     (define (loop segments)
316       (define (segment-equals-time? segment)
317         (time= time (time-segment-time segment)))
318
319       (define (segment-more-than-time? segment)
320         (time< time (time-segment-time segment)))
321
322       ;; We could switch this out to be more mutate'y
323       ;; and avoid the O(n) of space... is that over-optimizing?
324       (match segments
325         ;; If we're at the end of the list, time to make a new
326         ;; segment...
327         ('() (cons (new-time-segment) '()))
328         ;; If the segment's time is exactly our time, good news
329         ;; everyone!  Let's append our stuff to its queue
330         (((? segment-equals-time? first) rest ...)
331          (enq! (time-segment-queue first) proc)
332          segments)
333         ;; If the first segment is more than our time,
334         ;; ours belongs before this one, so add it and
335         ;; start consing our way back
336         (((? segment-more-than-time? first) rest ...)
337          (cons (new-time-segment) segments))
338         ;; Otherwise, build up recursive result
339         ((first rest ... )
340          (cons first (loop rest)))))
341     (set-schedule-segments!
342      schedule
343      (loop (schedule-segments schedule)))))
344
345 (define (schedule-empty? schedule)
346   "Check if the SCHEDULE is currently empty"
347   (eq? (schedule-segments schedule) '()))
348
349 (define (schedule-segments-split schedule time)
350   "Does a multiple value return of time segments before/at and after TIME"
351   (let ((time (time-segment-right-format time)))
352     (define (segment-is-now? segment)
353       (time= (time-segment-time segment) time))
354     (define (segment-is-before-now? segment)
355       (time< (time-segment-time segment) time))
356
357     (let loop ((segments-before '())
358                (segments-left (schedule-segments schedule)))
359       (match segments-left
360         ;; end of the line, return
361         ('()
362          (values (reverse segments-before) '()))
363
364         ;; It's right now, so time to stop, but include this one in before
365         ;; but otherwise return
366         (((? segment-is-now? first) rest ...)
367          (values (reverse (cons first segments-before)) rest))
368
369         ;; This is prior or at now, so add it and keep going
370         (((? segment-is-before-now? first) rest ...)
371          (loop (cons first segments-before) rest))
372
373         ;; Otherwise it's past now, just return what we have
374         (segments-after
375          (values segments-before segments-after))))))
376
377 (define (schedule-extract-until! schedule time)
378   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
379   (receive (segments-before segments-after)
380       (schedule-segments-split schedule time)
381     (set-schedule-segments! schedule segments-after)
382     segments-before))
383
384 (define (add-segments-contents-to-queue! segments queue)
385   (for-each
386    (lambda (segment)
387      (let ((seg-queue (time-segment-queue segment)))
388        (while (not (q-empty? seg-queue))
389          (enq! queue (deq! seg-queue)))))
390    segments))
391
392
393 \f
394 ;;; Request to run stuff
395 ;;; ====================
396
397 (define-record-type <run-request>
398   (make-run-request proc when)
399   run-request?
400   (proc run-request-proc)
401   (when run-request-when))
402
403 (define* (run-it proc #:optional when)
404   "Make a request to run PROC (possibly at WHEN)"
405   (make-run-request proc when))
406
407 (define-syntax-rule (wrap body ...)
408   "Wrap contents in a procedure"
409   (lambda ()
410     body ...))
411
412 (define-syntax-rule (wrap-apply body)
413   "Wrap possibly multi-value function in a procedure, applies all arguments"
414   (lambda args
415     (apply body args)))
416
417
418 ;; @@: Do we really want `body ...' here?
419 ;;   what about just `body'?
420 (define-syntax-rule (run body ...)
421   "Run everything in BODY but wrap in a convenient procedure"
422   (make-run-request (wrap body ...) #f))
423
424 (define-syntax-rule (run-at body ... when)
425   "Run BODY at WHEN"
426   (make-run-request (wrap body ...) when))
427
428 ;; @@: Is it okay to overload the term "delay" like this?
429 ;;   Would `run-in' be better?
430 (define-syntax-rule (run-delay body ... delay-time)
431   "Run BODY at DELAY-TIME time from now"
432   (make-run-request (wrap body ...) (tdelta delay-time)))
433
434
435 \f
436 ;;; Asynchronous escape to run things
437 ;;; =================================
438
439 (define-syntax-rule (8sync-abort-to-prompt async-request)
440   (abort-to-prompt (current-agenda-prompt)
441                    async-request))
442
443 ;; Async port request and run-request meta-requests
444 (define (make-async-request proc)
445   "Wrap PROC in an async-request
446
447 The purpose of this is to make sure that users don't accidentally
448 return the wrong thing via (8sync) and trip themselves up."
449   (cons '*async-request* proc))
450
451 (define (setup-async-request resume-kont async-request)
452   "Complete an async request for agenda-run-once's continuation handling"
453   (match async-request
454     (('*async-request* . async-setup-proc)
455      (async-setup-proc resume-kont))
456     ;; TODO: deliver more helpful errors depending on what the user
457     ;;   returned
458     (_ (throw 'invalid-async-request
459               "Invalid request passed back via an (8sync) procedure."
460               async-request))))
461
462 (define-record-type <wrapped-exception>
463   (make-wrapped-exception key args stacks)
464   wrapped-exception?
465   (key wrapped-exception-key)
466   (args wrapped-exception-args)
467   (stacks wrapped-exception-stacks))
468
469 (define-syntax-rule (propagate-%async-exceptions body)
470   (let ((body-result body))
471     (if (wrapped-exception? body-result)
472         (throw '8sync-caught-error
473                (wrapped-exception-key body-result)
474                (wrapped-exception-args body-result)
475                (wrapped-exception-stacks body-result))
476         body-result)))
477
478 (define-syntax-rule (8sync-run body ...)
479   (8sync-run-at body ... #f))
480
481 (define-syntax-rule (8sync-run-at body ... when)
482   (propagate-%async-exceptions
483    (8sync-abort-to-prompt
484     ;; Send an asynchronous request to apply a continuation to the
485     ;; following function, then handle that as a request to the agenda
486     (make-async-request
487      (lambda (kont)
488        ;; We're making a run request
489        (make-run-request
490         ;; Wrapping the following execution to run...
491         (wrap
492          ;; Once we get the result from the inner part, we'll resume
493          ;; this continuation, but first
494          ;; @@: Is this running immediately, or queueing the result
495          ;;   after evaluation for the next agenda tick?  It looks
496          ;;   like evaluating immediately.  Is that what we want?
497          (kont
498           ;; Any unhandled errors are caught
499           (let ((exception-stack #f))
500             (catch #t
501               ;; Run the actual code the user requested
502               (lambda ()
503                 body ...)
504               ;; If something bad happened and we didn't catch it,
505               ;; we'll wrap it up in such a way that the continuation
506               ;; can address it
507               (lambda (key . args)
508                 (cond
509                  ((eq? key '8sync-caught-error)
510                   (match args
511                     ((orig-key orig-args orig-stacks)
512                      (make-wrapped-exception
513                       orig-key orig-args
514                       (cons exception-stack orig-stacks)))))
515                  (else
516                   (make-wrapped-exception key args
517                                           (list exception-stack)))))
518               (lambda _
519                 (set! exception-stack (make-stack #t 1 0)))))))
520         when))))))
521
522 (define-syntax-rule (8sync-run-delay body ... delay-time)
523   (8sync-run-at body ... (tdelta delay-time)))
524
525 (define-syntax-rule (8sync-delay args ...)
526   (8sync-run-delay args ...))
527
528 ;; TODO: Write (%run-immediately)
529
530 (define-syntax-rule (8sync body)
531   "Run body asynchronously but ignore its result...
532 forge ahead in our current function!"
533   (8sync-abort-to-prompt
534    (make-async-request
535     (lambda (kont)
536       (list (make-run-request
537              ;; What's with returning #f to kont?
538              ;; Otherwise we sometimes get errors like
539              ;; "Zero values returned to single-valued continuation""
540              (wrap (kont #f)) #f)
541             (make-run-request (lambda () body) #f))))))
542
543 ;; This is sugar... and could probably be considerably
544 ;; simplified and optimized.  But whatever.
545 (define-syntax-rule (8sleep time)
546   (8sync-delay 'no-op time))
547
548 ;; Voluntarily yield execution
549 (define (yield)  ; @@: should this be define-inlinable?
550   "Voluntarily yield execution to the scheduler."
551   (8sync-abort-to-prompt
552    (make-async-request
553     (lambda (kont)
554       (make-run-request (lambda () (kont #f)) #f)))))
555
556 \f
557 ;;; Execution of agenda, and current agenda
558 ;;; =======================================
559
560 (define %current-agenda (make-parameter #f))
561
562 (define (update-agenda-from-select! agenda)
563   "Potentially (select) on ports specified in agenda, adding items to queue.
564
565 Also handles sleeping when all we have to do is wait on the schedule."
566   (define (hash-keys hash)
567     (hash-map->list (lambda (k v) k) hash))
568   (define (get-wait-time)
569     ;; TODO: we need to figure this out based on whether there's anything
570     ;;   in the queue, and if not, how long till the next scheduled item
571     (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
572       (cond 
573        ((not (q-empty? (agenda-queue agenda)))
574         (cons 0 0))
575        (soonest-time    ; ie, the agenda is non-empty
576         (let* ((current-time (agenda-time agenda)))
577           (if (time<= soonest-time current-time)
578               ;; Well there's something due so let's select
579               ;; (this avoids a (possible?) race condition chance)
580               (cons 0 0)
581               (time-minus soonest-time current-time))))
582        (else
583         (cons #f #f)))))
584   (define (do-select)
585     ;; TODO: support usecond wait time too
586     (match (get-wait-time)
587       ((sec . usec)
588        (catch 'system-error
589          (lambda ()
590            (select (hash-keys (agenda-read-port-map agenda))
591                    (hash-keys (agenda-write-port-map agenda))
592                    '()
593                    sec usec))
594          (lambda (key . rest-args)
595            (match rest-args
596              ((_ _ _ (EINTR))
597               '(() () ()))
598              (_ (error "Unhandled error in select!" key rest-args))))))))
599   (define (get-procs-to-run)
600     (define (ports->procs ports port-map)
601       (lambda (initial-procs)
602         (fold
603          (lambda (port prev)
604            (define proc (hashq-ref port-map port))
605            ;; Now that we've selected on this port, it can be removed
606            (hashq-remove! port-map port)
607            (cons proc prev))
608          initial-procs
609          ports)))
610     (match (do-select)
611       ((read-ports write-ports except-ports) ; except-ports not used
612        ((compose (ports->procs
613                   read-ports
614                   (agenda-read-port-map agenda))
615                  (ports->procs
616                   write-ports
617                   (agenda-write-port-map agenda)))
618         '()))))
619   (define (update-agenda)
620     (let ((procs-to-run (get-procs-to-run))
621           (q (agenda-queue agenda)))
622       (for-each
623        (lambda (proc)
624          (enq! q proc))
625        procs-to-run))
626     agenda)
627   (define (ports-to-select?)
628     (define (has-items? selector)
629       ;; @@: O(n)
630       ;;    ... we could use hash-for-each and a continuation to jump
631       ;;    out with a #t at first indication of an item
632       (not (= (hash-count (const #t)
633                           (selector agenda))
634               0)))
635     (or (has-items? agenda-read-port-map)
636         (has-items? agenda-write-port-map)))
637
638   (if (or (ports-to-select?)
639           ;; select doubles as sleep...
640           (not (schedule-empty? (agenda-schedule agenda)))) 
641       (update-agenda)
642       agenda))
643
644 (define-record-type <read-request>
645   (make-read-request port proc)
646   read-request?
647   (port read-request-port)
648   (proc read-request-proc))
649
650 (define-record-type <write-request>
651   (make-write-request port proc)
652   write-request?
653   (port write-request-port)
654   (proc write-request-proc))
655
656 (define (agenda-handle-read-request! agenda read-request)
657   "Handle <read-request>, which is a request to add this port to the poll/select
658 on suspendable ports."
659   (hashq-set! (agenda-read-port-map agenda)
660               (read-request-port read-request)
661               (read-request-proc read-request)))
662
663 (define (agenda-handle-write-request! agenda write-request)
664   (hashq-set! (agenda-write-port-map agenda)
665               (write-request-port write-request)
666               (write-request-proc write-request)))
667
668 (define (stop-on-nothing-to-do agenda)
669   (and (q-empty? (agenda-queue agenda))
670        (schedule-empty? (agenda-schedule agenda))
671        (= 0 (hash-count (const #t) (agenda-read-port-map agenda)))
672        (= 0 (hash-count (const #t) (agenda-write-port-map agenda)))))
673
674
675 (define* (start-agenda agenda
676                        #:key
677                        ;; @@: Should we make stop-on-nothing-to-do
678                        ;;   the default stop-condition?
679                        (stop-condition stop-on-nothing-to-do)
680                        (get-time gettimeofday)
681                        (handle-ports update-agenda-from-select!)
682                        ;; For live hacking madness, etc
683                        (post-run-hook #f))
684   ;; TODO: Document fields
685   "Start up the AGENDA"
686   (let loop ((agenda agenda))
687     (let ((agenda   
688            ;; @@: Hm, maybe here would be a great place to handle
689            ;;   select'ing on ports.
690            ;;   We could compose over agenda-run-once and agenda-read-ports
691            (agenda-run-once agenda)))
692       ;; @@: This relies on mutation at present on the queue, in the rare
693       ;;   event it's used.  If we ever switch to something more immutable,
694       ;;   it should return a new modified agenda instead.
695       (if post-run-hook
696           (post-run-hook agenda))
697       (if (and stop-condition (stop-condition agenda))
698           'done
699           (let* ((agenda
700                   ;; We have to update the time after ports handled, too
701                   ;; because it may have changed after a select
702                   (set-field
703                    (handle-ports
704                     ;; Adjust the agenda's time just in time
705                     ;; We do this here rather than in agenda-run-once to make
706                     ;; agenda-run-once's behavior fairly predictable
707                     (set-field agenda (agenda-time) (get-time)))
708                    (agenda-time) (get-time))))
709             ;; Update the agenda's current queue based on
710             ;; currently applicable time segments
711             (add-segments-contents-to-queue!
712              (schedule-extract-until! (agenda-schedule agenda) (agenda-time agenda))
713              (agenda-queue agenda))
714             (loop agenda))))))
715
716
717 (define (print-error-and-continue key . args)
718   "Frequently used as pre-unwind-handler for agenda"
719   (cond
720    ((eq? key '8sync-caught-error)
721     (match args
722       ((orig-key orig-args stacks)
723        (display "\n*** Caught async exception. ***\n")
724        (format (current-error-port)
725                "* Original key '~s and arguments: ~s *\n"
726                orig-key orig-args)
727        (display "* Caught stacks below (ending with original) *\n\n")
728        (for-each
729         (lambda (s)
730           (display-backtrace s (current-error-port))
731           (newline (current-error-port)))
732         stacks))))
733    (else
734     (format (current-error-port)
735             "\n*** Caught exception with key '~s and arguments: ~s ***\n"
736             key args)
737     (display-backtrace (make-stack #t 1 0)
738                        (current-error-port))
739     (newline (current-error-port)))))
740
741 (define-syntax-rule (maybe-catch-all (catch-handler pre-unwind-handler)
742                                      body ...)
743   (if (or catch-handler pre-unwind-handler)
744       (catch
745         #t
746         (lambda ()
747           body ...)
748         (or catch-handler (lambda _ #f))
749         (or pre-unwind-handler (lambda _ #f)))
750       (begin body ...)))
751
752 (define (wait-for-readable port)
753   (8sync-abort-to-prompt
754    (make-async-request
755     (lambda (kont)
756       (make-read-request port (wrap (kont #f)))))))
757
758 (define (wait-for-writable port)
759   (8sync-abort-to-prompt
760    (make-async-request
761     (lambda (kont)
762       (make-write-request port (wrap (kont #f)))))))
763
764 (define (agenda-run-once agenda)
765   "Run once through the agenda, and produce a new agenda
766 based on the results"
767   (define (call-proc proc)
768     (call-with-prompt
769      (agenda-prompt-tag agenda)
770      (lambda ()
771        (parameterize ((%current-agenda agenda)
772                       ;; @@: Couldn't we just parameterize this at the start of
773                       ;;   the agenda...?
774                       (current-read-waiter wait-for-readable)
775                       (current-write-waiter wait-for-writable))
776          (maybe-catch-all
777           ((agenda-catch-handler agenda)
778            (agenda-pre-unwind-handler agenda))
779           (proc))))
780      (lambda (kont async-request)
781        (setup-async-request kont async-request))))
782
783   (let ((queue (agenda-queue agenda))
784         (next-queue (make-q)))
785     (while (not (q-empty? queue))
786       (let* ((proc (q-pop! queue))
787              (proc-result (call-proc proc))
788              (enqueue
789               (lambda (run-request)
790                 (define (schedule-at! time proc)
791                   (schedule-add! (agenda-schedule agenda) time proc))
792                 (let ((request-time (run-request-when run-request)))
793                   (match request-time
794                     ((? time-delta? time-delta)
795                      (let ((time (time-delta+ (agenda-time agenda)
796                                               time-delta)))
797                        (schedule-at! time (run-request-proc run-request))))
798                     ((? integer? sec)
799                      (let ((time (cons sec 0)))
800                        (schedule-at! time (run-request-proc run-request))))
801                     (((? integer? sec) . (? integer? usec))
802                      (schedule-at! request-time (run-request-proc run-request)))
803                     (#f
804                      (enq! next-queue (run-request-proc run-request))))))))
805         (define (handle-individual result)
806           ;; @@: Could maybe optimize by switching to an explicit cond...
807           (match result
808             ((? run-request? new-proc)
809              (enqueue new-proc))
810             ((? read-request? read-request)
811              (agenda-handle-read-request! agenda read-request))
812             ((? write-request? write-request)
813              (agenda-handle-write-request! agenda write-request))
814             ;; do nothing
815             ;; Remember, we don't throw an error here because procedures can
816             ;; return a run request, eg with run-it, at the end of their
817             ;; evaluation to keep looping.
818             ;; @@: Though is this really a useful feature?
819             (_ #f)))
820         ;; @@: We might support delay-wrapped procedures here
821         (match proc-result
822           ((results ...)
823            (for-each handle-individual results))
824           (one-result (handle-individual one-result)))))
825     ;; TODO: Alternately, we could return the next-queue
826     ;;   along with changes to be added to the schedule here?
827     ;; Return new agenda, with next queue set
828     (set-field agenda (agenda-queue) next-queue)))