c417a40e28fdad25cf811d796a92381d4efa71e8
[8sync.git] / 8sync / agenda.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright (C) 2015, 2016 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync agenda)
20   #:use-module (srfi srfi-1)
21   #:use-module (srfi srfi-9)
22   #:use-module (srfi srfi-9 gnu)
23   #:use-module (ice-9 q)
24   #:use-module (ice-9 match)
25   #:use-module (ice-9 receive)
26   #:export (<agenda>
27             make-agenda agenda?
28             agenda-queue agenda-prompt-tag
29             agenda-read-port-map agenda-write-port-map agenda-except-port-map
30             agenda-schedule
31             
32             make-async-prompt-tag
33
34             list->q make-q*
35
36             <time-segment>
37             make-time-segment time-segment?
38             time-segment-time time-segment-queue
39
40             time< time= time<= time-delta+
41             time-minus time-plus
42
43             <time-delta>
44             make-time-delta tdelta time-delta?
45             time-delta-sec time-delta-usec
46
47             <schedule>
48             make-schedule schedule?
49             schedule-add! schedule-empty?
50             schedule-segments
51             schedule-soonest-time
52
53             schedule-segments-split schedule-extract-until!
54             add-segments-contents-to-queue!
55
56             <run-request>
57             make-run-request run-request?
58             run-request-proc run-request-when
59
60             <port-request>
61             make-port-request port-request port-request?
62             port-request-port
63             port-request-read port-request-write port-request-except
64
65             <port-remove-request>
66             make-port-remove-request port-remove-request port-remove-request?
67             port-remove-request-port
68
69             run-it wrap wrap-apply run run-at run-delay
70
71             8sync 8sync-delay
72             8sync-run 8sync-run-at 8sync-run-delay
73             8sync-port 8sync-port-remove
74             8sync-nowait
75             
76             catch-8sync
77
78             ;; used for introspecting the error, but a method for making
79             ;; is not exposed
80             wrapped-exception?
81             wrapped-exception-key wrapped-exception-args
82             wrapped-exception-stacks
83
84             print-error-and-continue
85
86             stop-on-nothing-to-do
87
88             %current-agenda
89             start-agenda agenda-run-once))
90
91 ;; @@: Using immutable agendas here, so wouldn't it make sense to
92 ;;   replace this queue stuff with using pfds based immutable queues?
93
94 \f
95 ;;; Agenda definition
96 ;;; =================
97
98 ;;; The agenda consists of:
99 ;;;  - a queue of immediate items to handle
100 ;;;  - sheduled future events to be added to a future queue
101 ;;;  - a tag by which running processes can escape for some asynchronous
102 ;;;    operation (from which they can be returned later)
103 ;;;  - a mapping of ports to various handler procedures
104 ;;;
105 ;;; The goal, eventually, is for this all to be immutable and functional.
106 ;;; However, we aren't there yet.  Some tricky things:
107 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
108 ;;;  - Need to use immutable queues (ijp's pfds library?)
109 ;;;  - Modeling reading from ports as something repeatable,
110 ;;;    and with reasonable separation from functional components?
111
112 (define-immutable-record-type <agenda>
113   (make-agenda-intern queue prompt-tag
114                       read-port-map write-port-map except-port-map
115                       schedule time catch-handler pre-unwind-handler)
116   agenda?
117   (queue agenda-queue)
118   (prompt-tag agenda-prompt-tag)
119   (read-port-map agenda-read-port-map)
120   (write-port-map agenda-write-port-map)
121   (except-port-map agenda-except-port-map)
122   (schedule agenda-schedule)
123   (time agenda-time)
124   (catch-handler agenda-catch-handler)
125   (pre-unwind-handler agenda-pre-unwind-handler))
126
127 (define (make-async-prompt-tag)
128   "Make an async prompt tag for an agenda.
129
130 Generally done automatically for the user through (make-agenda)."
131   (make-prompt-tag "prompt"))
132
133 (define* (make-agenda #:key
134                       (queue (make-q))
135                       (prompt (make-prompt-tag))
136                       (read-port-map (make-hash-table))
137                       (write-port-map (make-hash-table))
138                       (except-port-map (make-hash-table))
139                       (schedule (make-schedule))
140                       (time (gettimeofday))
141                       (catch-handler #f)
142                       (pre-unwind-handler print-error-and-continue))
143   ;; TODO: document arguments
144   "Make a fresh agenda."
145   (make-agenda-intern queue prompt
146                       read-port-map write-port-map except-port-map
147                       schedule time
148                       catch-handler pre-unwind-handler))
149
150 (define (current-agenda-prompt)
151   "Get the prompt for the current agenda; signal an error if there isn't one."
152   (let ((current-agenda (%current-agenda)))
153     (if (not current-agenda)
154         (throw
155          'no-current-agenda
156          "Can't get current agenda prompt if there's no current agenda!")
157         (agenda-prompt-tag current-agenda))))
158
159 ;; helper for making queues for an agenda
160 (define (list->q lst)
161   "Makes a queue composed of LST items"
162   (let ((q (make-q)))
163     (for-each
164      (lambda (x)
165        (enq! q x))
166      lst)
167     q))
168
169 (define (make-q* . args)
170   "Makes a queue and populates it with this invocation's ARGS"
171   (list->q args))
172
173 \f
174 ;;; Schedule
175 ;;; ========
176
177 ;;; This is where we handle timed events for the future
178
179 ;; This section totally borrows from the ideas in SICP
180 ;; <3 <3 <3
181
182 ;; NOTE: time is a cons of (seconds . microseconds)
183
184 (define-record-type <time-segment>
185   (make-time-segment-intern time queue)
186   time-segment?
187   (time time-segment-time)
188   (queue time-segment-queue))
189
190 ;; @@: This seems to be the same as srfi-18's seconds->time procedure?
191 ;;   Maybe double check and switch to that?  (Thanks amz3!)
192
193 (define (time-from-float-or-fraction time)
194   "Produce a (sec . usec) pair from TIME, a float or fraction"
195   (let* ((mixed-whole (floor time))
196          (mixed-rest (- time mixed-whole))  ; float or fraction component
197          (sec mixed-whole)
198          (usec (floor (* 1000000 mixed-rest))))
199     (cons (inexact->exact sec) (inexact->exact usec))))
200
201 (define (time-segment-right-format time)
202   "Ensure TIME is in the right format.
203
204 The right format means (second . microsecond).
205 If an integer, will convert appropriately."
206   ;; TODO: add floating point / rational number support.
207   (match time
208     ;; time is already a cons of second and microsecnd
209     (((? integer? s) . (? integer? u)) time)
210     ;; time was just an integer (just the second)
211     ((? integer? _) (cons time 0))
212     ((or (? rational? _) (? inexact? _))
213      (time-from-float-or-fraction time))
214     (_ (throw 'invalid-time "Invalid time" time))))
215
216 (define* (make-time-segment time #:optional (queue (make-q)))
217   "Make a time segment of TIME and QUEUE
218
219 No automatic conversion is done, so you might have to
220 run (time-segment-right-format) first."
221   (make-time-segment-intern time queue))
222
223 (define (time< time1 time2)
224   "Check if TIME1 is less than TIME2"
225   (cond ((< (car time1)
226             (car time2))
227          #t)
228         ((and (= (car time1)
229                  (car time2))
230               (< (cdr time1)
231                  (cdr time2)))
232          #t)
233         (else #f)))
234
235 (define (time= time1 time2)
236   "Check whether TIME1 and TIME2 are equivalent"
237   (and (= (car time1) (car time2))
238        (= (cdr time1) (cdr time2))))
239
240 (define (time<= time1 time2)
241   "Check if TIME1 is less than or equal to TIME2"
242   (or (time< time1 time2)
243       (time= time1 time2)))
244
245
246 (define-record-type <time-delta>
247   (make-time-delta-intern sec usec)
248   time-delta?
249   (sec time-delta-sec)
250   (usec time-delta-usec))
251
252 (define* (make-time-delta time)
253   "Make a <time-delta> of SEC seconds and USEC microseconds.
254
255 This is used primarily so the agenda can recognize RUN-REQUEST objects
256 which are meant to delay computation"
257   (match (time-segment-right-format time)
258     ((sec . usec)
259      (make-time-delta-intern sec usec))))
260
261 (define tdelta make-time-delta)
262
263 (define (time-carry-correct time)
264   "Corrects/handles time microsecond carry.
265 Will produce (0 . 0) instead of a negative number, if needed."
266   (cond ((>= (cdr time) 1000000)
267          (cons
268           (+ (car time) 1)
269           (- (cdr time) 1000000)))
270         ((< (cdr time) 0)
271          (if (= (car time) 0)
272              '(0 0)
273              (cons
274               (- (car time) 1)
275               (+ (cdr time) 1000000))))
276         (else time)))
277
278 (define (time-delta+ time time-delta)
279   "Increment a TIME by the value of TIME-DELTA"
280   (time-carry-correct
281    (cons (+ (car time) (time-delta-sec time-delta))
282          (+ (cdr time) (time-delta-usec time-delta)))))
283
284 (define (time-minus time1 time2)
285   "Subtract TIME2 from TIME1"
286   (time-carry-correct
287    (cons (- (car time1) (car time2))
288          (- (cdr time1) (cdr time2)))))
289
290 (define (time-plus time1 time2)
291   "Add TIME1 and TIME2"
292   (time-carry-correct
293    (cons (+ (car time1) (car time2))
294          (+ (cdr time1) (cdr time2)))))
295
296
297 (define-record-type <schedule>
298   (make-schedule-intern segments)
299   schedule?
300   (segments schedule-segments set-schedule-segments!))
301
302 (define* (make-schedule #:optional segments)
303   "Make a schedule, optionally pre-composed of SEGMENTS"
304   (make-schedule-intern (or segments '())))
305
306 (define (schedule-soonest-time schedule)
307   "Return a cons of (sec . usec) for next time segement, or #f if none"
308   (let ((segments (schedule-segments schedule)))
309     (if (eq? segments '())
310         #f
311         (time-segment-time (car segments)))))
312
313 ;; TODO: This code is reasonably easy to read but it
314 ;;   mutates AND is worst case of O(n) in both space and time :(
315 ;;   but at least it'll be reasonably easy to refactor to
316 ;;   a more functional setup?
317 (define (schedule-add! schedule time proc)
318   "Mutate SCHEDULE, adding PROC at an appropriate time segment for TIME"
319   (let ((time (time-segment-right-format time)))
320     (define (new-time-segment)
321       (let ((new-segment
322              (make-time-segment time)))
323         (enq! (time-segment-queue new-segment) proc)
324         new-segment))
325     (define (loop segments)
326       (define (segment-equals-time? segment)
327         (time= time (time-segment-time segment)))
328
329       (define (segment-more-than-time? segment)
330         (time< time (time-segment-time segment)))
331
332       ;; We could switch this out to be more mutate'y
333       ;; and avoid the O(n) of space... is that over-optimizing?
334       (match segments
335         ;; If we're at the end of the list, time to make a new
336         ;; segment...
337         ('() (cons (new-time-segment) '()))
338         ;; If the segment's time is exactly our time, good news
339         ;; everyone!  Let's append our stuff to its queue
340         (((? segment-equals-time? first) rest ...)
341          (enq! (time-segment-queue first) proc)
342          segments)
343         ;; If the first segment is more than our time,
344         ;; ours belongs before this one, so add it and
345         ;; start consing our way back
346         (((? segment-more-than-time? first) rest ...)
347          (cons (new-time-segment) segments))
348         ;; Otherwise, build up recursive result
349         ((first rest ... )
350          (cons first (loop rest)))))
351     (set-schedule-segments!
352      schedule
353      (loop (schedule-segments schedule)))))
354
355 (define (schedule-empty? schedule)
356   "Check if the SCHEDULE is currently empty"
357   (eq? (schedule-segments schedule) '()))
358
359 (define (schedule-segments-split schedule time)
360   "Does a multiple value return of time segments before/at and after TIME"
361   (let ((time (time-segment-right-format time)))
362     (define (segment-is-now? segment)
363       (time= (time-segment-time segment) time))
364     (define (segment-is-before-now? segment)
365       (time< (time-segment-time segment) time))
366
367     (let loop ((segments-before '())
368                (segments-left (schedule-segments schedule)))
369       (match segments-left
370         ;; end of the line, return
371         ('()
372          (values (reverse segments-before) '()))
373
374         ;; It's right now, so time to stop, but include this one in before
375         ;; but otherwise return
376         (((? segment-is-now? first) rest ...)
377          (values (reverse (cons first segments-before)) rest))
378
379         ;; This is prior or at now, so add it and keep going
380         (((? segment-is-before-now? first) rest ...)
381          (loop (cons first segments-before) rest))
382
383         ;; Otherwise it's past now, just return what we have
384         (segments-after
385          (values segments-before segments-after))))))
386
387 (define (schedule-extract-until! schedule time)
388   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
389   (receive (segments-before segments-after)
390       (schedule-segments-split schedule time)
391     (set-schedule-segments! schedule segments-after)
392     segments-before))
393
394 (define (add-segments-contents-to-queue! segments queue)
395   (for-each
396    (lambda (segment)
397      (let ((seg-queue (time-segment-queue segment)))
398        (while (not (q-empty? seg-queue))
399          (enq! queue (deq! seg-queue)))))
400    segments))
401
402
403 \f
404 ;;; Request to run stuff
405 ;;; ====================
406
407 (define-record-type <run-request>
408   (make-run-request proc when)
409   run-request?
410   (proc run-request-proc)
411   (when run-request-when))
412
413 (define* (run-it proc #:optional when)
414   "Make a request to run PROC (possibly at WHEN)"
415   (make-run-request proc when))
416
417 (define-syntax-rule (wrap body ...)
418   "Wrap contents in a procedure"
419   (lambda ()
420     body ...))
421
422 (define-syntax-rule (wrap-apply body)
423   "Wrap possibly multi-value function in a procedure, applies all arguments"
424   (lambda args
425     (apply body args)))
426
427
428 ;; @@: Do we really want `body ...' here?
429 ;;   what about just `body'?
430 (define-syntax-rule (run body ...)
431   "Run everything in BODY but wrap in a convenient procedure"
432   (make-run-request (wrap body ...) #f))
433
434 (define-syntax-rule (run-at body ... when)
435   "Run BODY at WHEN"
436   (make-run-request (wrap body ...) when))
437
438 ;; @@: Is it okay to overload the term "delay" like this?
439 ;;   Would `run-in' be better?
440 (define-syntax-rule (run-delay body ... delay-time)
441   "Run BODY at DELAY-TIME time from now"
442   (make-run-request (wrap body ...) (tdelta delay-time)))
443
444
445 ;; A request to set up a port with at least one of read, write, except
446 ;; handling processes
447
448 (define-record-type <port-request>
449   (make-port-request-intern port read write except)
450   port-request?
451   (port port-request-port)
452   (read port-request-read)
453   (write port-request-write)
454   (except port-request-except))
455
456 (define* (make-port-request port #:key read write except)
457   (if (not (or read write except))
458       (throw 'no-port-handler-given "No port handler given.\n"))
459   (make-port-request-intern port read write except))
460
461 (define port-request make-port-request)
462
463 (define-record-type <port-remove-request>
464   (make-port-remove-request port)
465   port-remove-request?
466   (port port-remove-request-port))
467
468 (define port-remove-request make-port-remove-request)
469
470
471 \f
472 ;;; Asynchronous escape to run things
473 ;;; =================================
474
475 (define-syntax-rule (8sync-abort-to-prompt async-request)
476   (abort-to-prompt (current-agenda-prompt)
477                    async-request))
478
479 ;; Async port request and run-request meta-requests
480 (define (make-async-request proc)
481   "Wrap PROC in an async-request
482
483 The purpose of this is to make sure that users don't accidentally
484 return the wrong thing via (8sync) and trip themselves up."
485   (cons '*async-request* proc))
486
487 (define (setup-async-request resume-kont async-request)
488   "Complete an async request for agenda-run-once's continuation handling"
489   (match async-request
490     (('*async-request* . async-setup-proc)
491      (async-setup-proc resume-kont))
492     ;; TODO: deliver more helpful errors depending on what the user
493     ;;   returned
494     (_ (throw 'invalid-async-request
495               "Invalid request passed back via an (8sync) procedure."
496               async-request))))
497
498 (define-record-type <wrapped-exception>
499   (make-wrapped-exception key args stacks)
500   wrapped-exception?
501   (key wrapped-exception-key)
502   (args wrapped-exception-args)
503   (stacks wrapped-exception-stacks))
504
505 (define-syntax-rule (propagate-%async-exceptions body)
506   (let ((body-result body))
507     (if (wrapped-exception? body-result)
508         (throw '8sync-caught-error
509                (wrapped-exception-key body-result)
510                (wrapped-exception-args body-result)
511                (wrapped-exception-stacks body-result))
512         body-result)))
513
514 (define-syntax 8sync
515   (syntax-rules ()
516     "Run BODY asynchronously (8synchronously?) at a prompt, then return.
517
518 Possibly specify WHEN as the second argument."
519     ((8sync body)
520      (8sync-run body))
521     ((8sync body when)
522      (8sync-run-at body when))))
523
524 (define-syntax-rule (8sync-run body ...)
525   (8sync-run-at body ... #f))
526
527 (define-syntax-rule (8sync-run-at body ... when)
528   (propagate-%async-exceptions
529    (8sync-abort-to-prompt
530     ;; Send an asynchronous request to apply a continuation to the
531     ;; following function, then handle that as a request to the agenda
532     (make-async-request
533      (lambda (kont)
534        ;; We're making a run request
535        (make-run-request
536         ;; Wrapping the following execution to run...
537         (wrap
538          ;; Once we get the result from the inner part, we'll resume
539          ;; this continuation, but first
540          ;; @@: Is this running immediately, or queueing the result
541          ;;   after evaluation for the next agenda tick?  It looks
542          ;;   like evaluating immediately.  Is that what we want?
543          (kont
544           ;; Any unhandled errors are caught
545           (let ((exception-stack #f))
546             (catch #t
547               ;; Run the actual code the user requested
548               (lambda ()
549                 body ...)
550               ;; If something bad happened and we didn't catch it,
551               ;; we'll wrap it up in such a way that the continuation
552               ;; can address it
553               (lambda (key . args)
554                 (cond
555                  ((eq? key '8sync-caught-error)
556                   (match args
557                     ((orig-key orig-args orig-stacks)
558                      (make-wrapped-exception
559                       orig-key orig-args
560                       (cons exception-stack orig-stacks)))))
561                  (else
562                   (make-wrapped-exception key args
563                                           (list exception-stack)))))
564               (lambda _
565                 (set! exception-stack (make-stack #t 1 0)))))))
566         when))))))
567
568 (define-syntax-rule (8sync-run-delay body ... delay-time)
569   (8sync-run-at body ... (tdelta delay-time)))
570
571 (define-syntax-rule (8sync-delay args ...)
572   (8sync-run-delay args ...))
573
574 (define-syntax-rule (8sync-port port port-request-args ...)
575   (8sync-abort-to-prompt
576    (make-async-request
577     (lambda (kont)
578       (list (make-port-request port port-request-args ...)
579             (make-run-request
580              ;; What's with returning #f to kont?
581              ;; Otherwise we sometimes get errors like
582              ;; "Zero values returned to single-valued continuation""
583              (wrap (kont #f)) #f))))))
584
585 (define-syntax-rule (8sync-port-remove port)
586   (8sync-abort-to-prompt
587    (make-async-request
588     (lambda (kont)
589       (list (make-port-remove-request port)
590             (make-run-request
591              ;; See comment in 8sync-port
592              (wrap (kont #f)) #f))))))
593
594
595 ;; TODO: Write (%run-immediately)
596
597 (define-syntax-rule (8sync-nowait body)
598   "Run body asynchronously but ignore its result...
599 forge ahead in our current function!"
600   (8sync-abort-to-prompt
601    (make-async-request
602     (lambda (kont)
603       (list (make-run-request
604              ;; See comment in 8sync-port
605              (wrap (kont #f)) #f)
606             (make-run-request (lambda () body) #f))))))
607
608 (define-syntax-rule (catch-8sync exp (handler-key handler) ...)
609   (catch '8sync-caught-error
610     (lambda ()
611       exp)
612     (lambda (_ orig-key orig-args orig-stacks)
613       (cond
614        ((or (eq? handler-key #t)
615             (eq? orig-key handler-key))
616         (apply handler orig-stacks orig-args)) ...
617        (else (raise '8sync-caught-error
618                     orig-key orig-args orig-stacks))))))
619
620
621 \f
622 ;;; Execution of agenda, and current agenda
623 ;;; =======================================
624
625 (define %current-agenda (make-parameter #f))
626
627 (define (update-agenda-from-select! agenda)
628   "Potentially (select) on ports specified in agenda, adding items to queue.
629
630 Also handles sleeping when all we have to do is wait on the schedule."
631   (define (hash-keys hash)
632     (hash-map->list (lambda (k v) k) hash))
633   (define (get-wait-time)
634     ;; TODO: we need to figure this out based on whether there's anything
635     ;;   in the queue, and if not, how long till the next scheduled item
636     (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
637       (cond 
638        ((not (q-empty? (agenda-queue agenda)))
639         (cons 0 0))
640        (soonest-time    ; ie, the agenda is non-empty
641         (let* ((current-time (agenda-time agenda)))
642           (if (time<= soonest-time current-time)
643               ;; Well there's something due so let's select
644               ;; (this avoids a (possible?) race condition chance)
645               (cons 0 0)
646               (time-minus soonest-time current-time))))
647        (else
648         (cons #f #f)))))
649   (define (do-select)
650     ;; TODO: support usecond wait time too
651     (match (get-wait-time)
652       ((sec . usec)
653        (catch 'system-error
654          (lambda ()
655            (select (hash-keys (agenda-read-port-map agenda))
656                    (hash-keys (agenda-write-port-map agenda))
657                    (hash-keys (agenda-except-port-map agenda))
658                    sec usec))
659          (lambda (key . rest-args)
660            (match rest-args
661              ((_ _ _ (EINTR))
662               '(() () ()))
663              (_ (error "Unhandled error in select!" key rest-args))))))))
664   (define (get-procs-to-run)
665     (define (ports->procs ports port-map)
666       (lambda (initial-procs)
667         (fold
668          (lambda (port prev)
669            (cons (lambda ()
670                    ((hash-ref port-map port) port))
671                  prev))
672          initial-procs
673          ports)))
674     (match (do-select)
675       ((read-ports write-ports except-ports)
676        ;; @@: Come on, we can do better than append ;P
677        ((compose (ports->procs
678                   read-ports
679                   (agenda-read-port-map agenda))
680                  (ports->procs
681                   write-ports
682                   (agenda-write-port-map agenda))
683                  (ports->procs
684                   except-ports
685                   (agenda-except-port-map agenda)))
686         '()))))
687   (define (update-agenda)
688     (let ((procs-to-run (get-procs-to-run))
689           (q (agenda-queue agenda)))
690       (for-each
691        (lambda (proc)
692          (enq! q proc))
693        procs-to-run))
694     agenda)
695   (define (ports-to-select?)
696     (define (has-items? selector)
697       ;; @@: O(n)
698       ;;    ... we could use hash-for-each and a continuation to jump
699       ;;    out with a #t at first indication of an item
700       (not (= (hash-count (const #t)
701                           (selector agenda))
702               0)))
703     (or (has-items? agenda-read-port-map)
704         (has-items? agenda-write-port-map)
705         (has-items? agenda-except-port-map)))
706
707   (if (or (ports-to-select?)
708           ;; select doubles as sleep...
709           (not (schedule-empty? (agenda-schedule agenda)))) 
710       (update-agenda)
711       agenda))
712
713 (define (agenda-handle-port-request! agenda port-request)
714   "Update an agenda for a port-request"
715   (define (handle-selector request-selector port-map-selector)
716     (if (request-selector port-request)
717         ;; @@: Should we remove if #f?
718         (hash-set! (port-map-selector agenda)
719                    (port-request-port port-request)
720                    (request-selector port-request))))
721   (handle-selector port-request-read agenda-read-port-map)
722   (handle-selector port-request-write agenda-write-port-map)
723   (handle-selector port-request-except agenda-except-port-map))
724
725
726 (define (agenda-handle-port-remove-request! agenda port-remove-request)
727   "Update an agenda for a port-remove-request"
728   (let ((port (port-remove-request-port port-remove-request)))
729     (hash-remove! (agenda-read-port-map agenda) port)
730     (hash-remove! (agenda-write-port-map agenda) port)
731     (hash-remove! (agenda-except-port-map agenda) port)))
732
733
734 (define (stop-on-nothing-to-do agenda)
735   (and (q-empty? (agenda-queue agenda))
736        (schedule-empty? (agenda-schedule agenda))
737        (= 0 (hash-count (const #t) (agenda-read-port-map agenda)))
738        (= 0 (hash-count (const #t) (agenda-write-port-map agenda)))
739        (= 0 (hash-count (const #t) (agenda-except-port-map agenda)))))
740
741
742 (define* (start-agenda agenda
743                        #:key
744                        ;; @@: Should we make stop-on-nothing-to-do
745                        ;;   the default stop-condition?
746                        (stop-condition stop-on-nothing-to-do)
747                        (get-time gettimeofday)
748                        (handle-ports update-agenda-from-select!))
749   ;; TODO: Document fields
750   "Start up the AGENDA"
751   (let loop ((agenda agenda))
752     (let ((agenda   
753            ;; @@: Hm, maybe here would be a great place to handle
754            ;;   select'ing on ports.
755            ;;   We could compose over agenda-run-once and agenda-read-ports
756            (agenda-run-once agenda)))
757       (if (and stop-condition (stop-condition agenda))
758           'done
759           (let* ((agenda
760                   ;; We have to update the time after ports handled, too
761                   ;; because it may have changed after a select
762                   (set-field
763                    (handle-ports
764                     ;; Adjust the agenda's time just in time
765                     ;; We do this here rather than in agenda-run-once to make
766                     ;; agenda-run-once's behavior fairly predictable
767                     (set-field agenda (agenda-time) (get-time)))
768                    (agenda-time) (get-time))))
769             ;; Update the agenda's current queue based on
770             ;; currently applicable time segments
771             (add-segments-contents-to-queue!
772              (schedule-extract-until! (agenda-schedule agenda) (agenda-time agenda))
773              (agenda-queue agenda))
774             (loop agenda))))))
775
776
777 (define (print-error-and-continue key . args)
778   "Frequently used as pre-unwind-handler for agenda"
779   (cond
780    ((eq? key '8sync-caught-error)
781     (match args
782       ((orig-key orig-args stacks)
783        (display "\n*** Caught async exception. ***\n")
784        (format (current-error-port)
785                "* Original key '~s and arguments: ~s *\n"
786                orig-key orig-args)
787        (display "* Caught stacks below (ending with original) *\n\n")
788        (for-each
789         (lambda (s)
790           (display-backtrace s (current-error-port))
791           (newline (current-error-port)))
792         stacks))))
793    (else
794     (format (current-error-port)
795             "\n*** Caught exception with key '~s and arguments: ~s ***\n"
796             key args)
797     (display-backtrace (make-stack #t 1 0)
798                        (current-error-port))
799     (newline (current-error-port)))))
800
801 (define-syntax-rule (maybe-catch-all (catch-handler pre-unwind-handler)
802                                      body ...)
803   (if (or catch-handler pre-unwind-handler)
804       (catch
805         #t
806         (lambda ()
807           body ...)
808         (or catch-handler (lambda _ #f))
809         (or pre-unwind-handler (lambda _ #f)))
810       (begin body ...)))
811
812 (define (agenda-run-once agenda)
813   "Run once through the agenda, and produce a new agenda
814 based on the results"
815   (define (call-proc proc)
816     (call-with-prompt
817      (agenda-prompt-tag agenda)
818      (lambda ()
819        (parameterize ((%current-agenda agenda))
820          (maybe-catch-all
821           ((agenda-catch-handler agenda)
822            (agenda-pre-unwind-handler agenda))
823           (proc))))
824      (lambda (kont async-request)
825        (setup-async-request kont async-request))))
826
827   (let ((queue (agenda-queue agenda))
828         (next-queue (make-q)))
829     (while (not (q-empty? queue))
830       (let* ((proc (q-pop! queue))
831              (proc-result (call-proc proc))
832              (enqueue
833               (lambda (run-request)
834                 (define (schedule-at! time proc)
835                   (schedule-add! (agenda-schedule agenda) time proc))
836                 (let ((request-time (run-request-when run-request)))
837                   (match request-time
838                     ((? time-delta? time-delta)
839                      (let ((time (time-delta+ (agenda-time agenda)
840                                               time-delta)))
841                        (schedule-at! time (run-request-proc run-request))))
842                     ((? integer? sec)
843                      (let ((time (cons sec 0)))
844                        (schedule-at! time (run-request-proc run-request))))
845                     (((? integer? sec) . (? integer? usec))
846                      (schedule-at! request-time (run-request-proc run-request)))
847                     (#f
848                      (enq! next-queue (run-request-proc run-request))))))))
849         (define (handle-individual result)
850           ;; @@: Could maybe optimize by switching to an explicit cond...
851           (match result
852             ((? run-request? new-proc)
853              (enqueue new-proc))
854             ((? port-request? port-request)
855              (agenda-handle-port-request! agenda port-request))
856             ((? port-remove-request? port-remove-request)
857              (agenda-handle-port-remove-request! agenda port-remove-request))
858             ;; do nothing
859             (_ #f)))
860         ;; @@: We might support delay-wrapped procedures here
861         (match proc-result
862           ((results ...)
863            (for-each handle-individual results))
864           (one-result (handle-individual one-result)))))
865     ;; TODO: Alternately, we could return the next-queue
866     ;;   along with changes to be added to the schedule here?
867     ;; Return new agenda, with next queue set
868     (set-field agenda (agenda-queue) next-queue)))