doc: Removing a sentence which made things tricky
[8sync.git] / 8sync / agenda.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright (C) 2015 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync agenda)
20   #:use-module (srfi srfi-1)
21   #:use-module (srfi srfi-9)
22   #:use-module (srfi srfi-9 gnu)
23   #:use-module (ice-9 q)
24   #:use-module (ice-9 match)
25   #:use-module (ice-9 receive)
26   #:export (<agenda>
27             make-agenda agenda?
28             agenda-queue agenda-prompt-tag
29             agenda-read-port-map agenda-write-port-map agenda-except-port-map
30             agenda-schedule
31             
32             make-async-prompt-tag
33
34             list->q make-q*
35
36             <time-segment>
37             make-time-segment time-segment?
38             time-segment-time time-segment-queue
39
40             time< time= time<= time-delta+
41             time-minus time-plus
42
43             <time-delta>
44             make-time-delta tdelta time-delta?
45             time-delta-sec time-delta-usec
46
47             <schedule>
48             make-schedule schedule?
49             schedule-add! schedule-empty?
50             schedule-segments
51             schedule-soonest-time
52
53             schedule-segments-split schedule-extract-until!
54             add-segments-contents-to-queue!
55
56             <run-request>
57             make-run-request run-request?
58             run-request-proc run-request-when
59
60             <port-request>
61             make-port-request port-request port-request?
62             port-request-port
63             port-request-read port-request-write port-request-except
64
65             <port-remove-request>
66             make-port-remove-request port-remove-request port-remove-request?
67             port-remove-request-port
68
69             run-it wrap wrap-apply run run-at run-delay
70
71             %8sync %8sync-delay
72             %8sync-run %8sync-run-at %8sync-run-delay
73             %8sync-port %8sync-port-remove
74             
75             catch-8sync catch-%8sync
76
77             ;; used for introspecting the error, but a method for making
78             ;; is not exposed
79             wrapped-exception?
80             wrapped-exception-key wrapped-exception-args
81             wrapped-exception-stacks
82
83             print-error-and-continue
84
85             stop-on-nothing-to-do
86
87             %current-agenda
88             start-agenda agenda-run-once))
89
90 ;; @@: Using immutable agendas here, so wouldn't it make sense to
91 ;;   replace this queue stuff with using pfds based immutable queues?
92
93 \f
94 ;;; Agenda definition
95 ;;; =================
96
97 ;;; The agenda consists of:
98 ;;;  - a queue of immediate items to handle
99 ;;;  - sheduled future events to be added to a future queue
100 ;;;  - a tag by which running processes can escape for some asynchronous
101 ;;;    operation (from which they can be returned later)
102 ;;;  - a mapping of ports to various handler procedures
103 ;;;
104 ;;; The goal, eventually, is for this all to be immutable and functional.
105 ;;; However, we aren't there yet.  Some tricky things:
106 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
107 ;;;  - Need to use immutable queues (ijp's pfds library?)
108 ;;;  - Modeling reading from ports as something repeatable,
109 ;;;    and with reasonable separation from functional components?
110
111 (define-immutable-record-type <agenda>
112   (make-agenda-intern queue prompt-tag
113                       read-port-map write-port-map except-port-map
114                       schedule time catch-handler pre-unwind-handler)
115   agenda?
116   (queue agenda-queue)
117   (prompt-tag agenda-prompt-tag)
118   (read-port-map agenda-read-port-map)
119   (write-port-map agenda-write-port-map)
120   (except-port-map agenda-except-port-map)
121   (schedule agenda-schedule)
122   (time agenda-time)
123   (catch-handler agenda-catch-handler)
124   (pre-unwind-handler agenda-pre-unwind-handler))
125
126 (define (make-async-prompt-tag)
127   "Make an async prompt tag for an agenda.
128
129 Generally done automatically for the user through (make-agenda)."
130   (make-prompt-tag "prompt"))
131
132 (define* (make-agenda #:key
133                       (queue (make-q))
134                       (prompt (make-prompt-tag))
135                       (read-port-map (make-hash-table))
136                       (write-port-map (make-hash-table))
137                       (except-port-map (make-hash-table))
138                       (schedule (make-schedule))
139                       (time (gettimeofday))
140                       (catch-handler #f)
141                       (pre-unwind-handler #f))
142   ;; TODO: document arguments
143   "Make a fresh agenda."
144   (make-agenda-intern queue prompt
145                       read-port-map write-port-map except-port-map
146                       schedule time
147                       catch-handler pre-unwind-handler))
148
149 (define (current-agenda-prompt)
150   "Get the prompt for the current agenda; signal an error if there isn't one."
151   (let ((current-agenda (%current-agenda)))
152     (if (not current-agenda)
153         (throw
154          'no-current-agenda
155          "Can't get current agenda prompt if there's no current agenda!")
156         (agenda-prompt-tag current-agenda))))
157
158 ;; helper for making queues for an agenda
159 (define (list->q lst)
160   "Makes a queue composed of LST items"
161   (let ((q (make-q)))
162     (for-each
163      (lambda (x)
164        (enq! q x))
165      lst)
166     q))
167
168 (define (make-q* . args)
169   "Makes a queue and populates it with this invocation's ARGS"
170   (list->q args))
171
172 \f
173 ;;; Schedule
174 ;;; ========
175
176 ;;; This is where we handle timed events for the future
177
178 ;; This section totally borrows from the ideas in SICP
179 ;; <3 <3 <3
180
181 ;; NOTE: time is a cons of (seconds . microseconds)
182
183 (define-record-type <time-segment>
184   (make-time-segment-intern time queue)
185   time-segment?
186   (time time-segment-time)
187   (queue time-segment-queue))
188
189 ;; @@: This seems to be the same as srfi-18's seconds->time procedure?
190 ;;   Maybe double check and switch to that?  (Thanks amz3!)
191
192 (define (time-from-float-or-fraction time)
193   "Produce a (sec . usec) pair from TIME, a float or fraction"
194   (let* ((mixed-whole (floor time))
195          (mixed-rest (- time mixed-whole))  ; float or fraction component
196          (sec mixed-whole)
197          (usec (floor (* 1000000 mixed-rest))))
198     (cons (inexact->exact sec) (inexact->exact usec))))
199
200 (define (time-segment-right-format time)
201   "Ensure TIME is in the right format.
202
203 The right format means (second . microsecond).
204 If an integer, will convert appropriately."
205   ;; TODO: add floating point / rational number support.
206   (match time
207     ;; time is already a cons of second and microsecnd
208     (((? integer? s) . (? integer? u)) time)
209     ;; time was just an integer (just the second)
210     ((? integer? _) (cons time 0))
211     ((or (? rational? _) (? inexact? _))
212      (time-from-float-or-fraction time))
213     (_ (throw 'invalid-time "Invalid time" time))))
214
215 (define* (make-time-segment time #:optional (queue (make-q)))
216   "Make a time segment of TIME and QUEUE
217
218 No automatic conversion is done, so you might have to
219 run (time-segment-right-format) first."
220   (make-time-segment-intern time queue))
221
222 (define (time< time1 time2)
223   "Check if TIME1 is less than TIME2"
224   (cond ((< (car time1)
225             (car time2))
226          #t)
227         ((and (= (car time1)
228                  (car time2))
229               (< (cdr time1)
230                  (cdr time2)))
231          #t)
232         (else #f)))
233
234 (define (time= time1 time2)
235   "Check whether TIME1 and TIME2 are equivalent"
236   (and (= (car time1) (car time2))
237        (= (cdr time1) (cdr time2))))
238
239 (define (time<= time1 time2)
240   "Check if TIME1 is less than or equal to TIME2"
241   (or (time< time1 time2)
242       (time= time1 time2)))
243
244
245 (define-record-type <time-delta>
246   (make-time-delta-intern sec usec)
247   time-delta?
248   (sec time-delta-sec)
249   (usec time-delta-usec))
250
251 (define* (make-time-delta time)
252   "Make a <time-delta> of SEC seconds and USEC microseconds.
253
254 This is used primarily so the agenda can recognize RUN-REQUEST objects
255 which are meant to delay computation"
256   (match (time-segment-right-format time)
257     ((sec . usec)
258      (make-time-delta-intern sec usec))))
259
260 (define tdelta make-time-delta)
261
262 (define (time-carry-correct time)
263   "Corrects/handles time microsecond carry.
264 Will produce (0 . 0) instead of a negative number, if needed."
265   (cond ((>= (cdr time) 1000000)
266          (cons
267           (+ (car time) 1)
268           (- (cdr time) 1000000)))
269         ((< (cdr time) 0)
270          (if (= (car time) 0)
271              '(0 0)
272              (cons
273               (- (car time) 1)
274               (+ (cdr time) 1000000))))
275         (else time)))
276
277 (define (time-delta+ time time-delta)
278   "Increment a TIME by the value of TIME-DELTA"
279   (time-carry-correct
280    (cons (+ (car time) (time-delta-sec time-delta))
281          (+ (cdr time) (time-delta-usec time-delta)))))
282
283 (define (time-minus time1 time2)
284   "Subtract TIME2 from TIME1"
285   (time-carry-correct
286    (cons (- (car time1) (car time2))
287          (- (cdr time1) (cdr time2)))))
288
289 (define (time-plus time1 time2)
290   "Add TIME1 and TIME2"
291   (time-carry-correct
292    (cons (+ (car time1) (car time2))
293          (+ (cdr time1) (cdr time2)))))
294
295
296 (define-record-type <schedule>
297   (make-schedule-intern segments)
298   schedule?
299   (segments schedule-segments set-schedule-segments!))
300
301 (define* (make-schedule #:optional segments)
302   "Make a schedule, optionally pre-composed of SEGMENTS"
303   (make-schedule-intern (or segments '())))
304
305 (define (schedule-soonest-time schedule)
306   "Return a cons of (sec . usec) for next time segement, or #f if none"
307   (let ((segments (schedule-segments schedule)))
308     (if (eq? segments '())
309         #f
310         (time-segment-time (car segments)))))
311
312 ;; TODO: This code is reasonably easy to read but it
313 ;;   mutates AND is worst case of O(n) in both space and time :(
314 ;;   but at least it'll be reasonably easy to refactor to
315 ;;   a more functional setup?
316 (define (schedule-add! schedule time proc)
317   "Mutate SCHEDULE, adding PROC at an appropriate time segment for TIME"
318   (let ((time (time-segment-right-format time)))
319     (define (new-time-segment)
320       (let ((new-segment
321              (make-time-segment time)))
322         (enq! (time-segment-queue new-segment) proc)
323         new-segment))
324     (define (loop segments)
325       (define (segment-equals-time? segment)
326         (time= time (time-segment-time segment)))
327
328       (define (segment-more-than-time? segment)
329         (time< time (time-segment-time segment)))
330
331       ;; We could switch this out to be more mutate'y
332       ;; and avoid the O(n) of space... is that over-optimizing?
333       (match segments
334         ;; If we're at the end of the list, time to make a new
335         ;; segment...
336         ('() (cons (new-time-segment) '()))
337         ;; If the segment's time is exactly our time, good news
338         ;; everyone!  Let's append our stuff to its queue
339         (((? segment-equals-time? first) rest ...)
340          (enq! (time-segment-queue first) proc)
341          segments)
342         ;; If the first segment is more than our time,
343         ;; ours belongs before this one, so add it and
344         ;; start consing our way back
345         (((? segment-more-than-time? first) rest ...)
346          (cons (new-time-segment) segments))
347         ;; Otherwise, build up recursive result
348         ((first rest ... )
349          (cons first (loop rest)))))
350     (set-schedule-segments!
351      schedule
352      (loop (schedule-segments schedule)))))
353
354 (define (schedule-empty? schedule)
355   "Check if the SCHEDULE is currently empty"
356   (eq? (schedule-segments schedule) '()))
357
358 (define (schedule-segments-split schedule time)
359   "Does a multiple value return of time segments before/at and after TIME"
360   (let ((time (time-segment-right-format time)))
361     (define (segment-is-now? segment)
362       (time= (time-segment-time segment) time))
363     (define (segment-is-before-now? segment)
364       (time< (time-segment-time segment) time))
365
366     (let loop ((segments-before '())
367                (segments-left (schedule-segments schedule)))
368       (match segments-left
369         ;; end of the line, return
370         ('()
371          (values (reverse segments-before) '()))
372
373         ;; It's right now, so time to stop, but include this one in before
374         ;; but otherwise return
375         (((? segment-is-now? first) rest ...)
376          (values (reverse (cons first segments-before)) rest))
377
378         ;; This is prior or at now, so add it and keep going
379         (((? segment-is-before-now? first) rest ...)
380          (loop (cons first segments-before) rest))
381
382         ;; Otherwise it's past now, just return what we have
383         (segments-after
384          (values segments-before segments-after))))))
385
386 (define (schedule-extract-until! schedule time)
387   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
388   (receive (segments-before segments-after)
389       (schedule-segments-split schedule time)
390     (set-schedule-segments! schedule segments-after)
391     segments-before))
392
393 (define (add-segments-contents-to-queue! segments queue)
394   (for-each
395    (lambda (segment)
396      (let ((seg-queue (time-segment-queue segment)))
397        (while (not (q-empty? seg-queue))
398          (enq! queue (deq! seg-queue)))))
399    segments))
400
401
402 \f
403 ;;; Request to run stuff
404 ;;; ====================
405
406 (define-record-type <run-request>
407   (make-run-request proc when)
408   run-request?
409   (proc run-request-proc)
410   (when run-request-when))
411
412 (define* (run-it proc #:optional when)
413   "Make a request to run PROC (possibly at WHEN)"
414   (make-run-request proc when))
415
416 (define-syntax-rule (wrap body ...)
417   "Wrap contents in a procedure"
418   (lambda ()
419     body ...))
420
421 (define-syntax-rule (wrap-apply body)
422   "Wrap possibly multi-value function in a procedure, applies all arguments"
423   (lambda args
424     (apply body args)))
425
426
427 ;; @@: Do we really want `body ...' here?
428 ;;   what about just `body'?
429 (define-syntax-rule (run body ...)
430   "Run everything in BODY but wrap in a convenient procedure"
431   (make-run-request (wrap body ...) #f))
432
433 (define-syntax-rule (run-at body ... when)
434   "Run BODY at WHEN"
435   (make-run-request (wrap body ...) when))
436
437 ;; @@: Is it okay to overload the term "delay" like this?
438 ;;   Would `run-in' be better?
439 (define-syntax-rule (run-delay body ... delay-time)
440   "Run BODY at DELAY-TIME time from now"
441   (make-run-request (wrap body ...) (tdelta delay-time)))
442
443
444 ;; A request to set up a port with at least one of read, write, except
445 ;; handling processes
446
447 (define-record-type <port-request>
448   (make-port-request-intern port read write except)
449   port-request?
450   (port port-request-port)
451   (read port-request-read)
452   (write port-request-write)
453   (except port-request-except))
454
455 (define* (make-port-request port #:key read write except)
456   (if (not (or read write except))
457       (throw 'no-port-handler-given "No port handler given.\n"))
458   (make-port-request-intern port read write except))
459
460 (define port-request make-port-request)
461
462 (define-record-type <port-remove-request>
463   (make-port-remove-request port)
464   port-remove-request?
465   (port port-remove-request-port))
466
467 (define port-remove-request make-port-remove-request)
468
469
470 \f
471 ;;; Asynchronous escape to run things
472 ;;; =================================
473
474 (define-syntax-rule (%8sync-abort-to-prompt async-request)
475   (abort-to-prompt (current-agenda-prompt)
476                    async-request))
477
478 ;; Async port request and run-request meta-requests
479 (define (make-async-request proc)
480   "Wrap PROC in an async-request
481
482 The purpose of this is to make sure that users don't accidentally
483 return the wrong thing via (%8sync) and trip themselves up."
484   (cons '*async-request* proc))
485
486 (define (setup-async-request resume-kont async-request)
487   "Complete an async request for agenda-run-once's continuation handling"
488   (match async-request
489     (('*async-request* . async-setup-proc)
490      (async-setup-proc resume-kont))
491     ;; TODO: deliver more helpful errors depending on what the user
492     ;;   returned
493     (_ (throw 'invalid-async-request
494               "Invalid request passed back via an (%8sync) procedure."
495               async-request))))
496
497 (define-record-type <wrapped-exception>
498   (make-wrapped-exception key args stacks)
499   wrapped-exception?
500   (key wrapped-exception-key)
501   (args wrapped-exception-args)
502   (stacks wrapped-exception-stacks))
503
504 (define-syntax-rule (propagate-%async-exceptions body)
505   (let ((body-result body))
506     (if (wrapped-exception? body-result)
507         (throw '8sync-caught-error
508                (wrapped-exception-key body-result)
509                (wrapped-exception-args body-result)
510                (wrapped-exception-stacks body-result))
511         body-result)))
512
513 (define-syntax %8sync
514   (syntax-rules ()
515     "Run BODY asynchronously (8synchronously?) at a prompt, then return.
516
517 Possibly specify WHEN as the second argument."
518     ((%8sync body)
519      (%8sync-run body))
520     ((%8sync body when)
521      (%8sync-run-at body when))))
522
523 (define-syntax-rule (%8sync-run body ...)
524   (%8sync-run-at body ... #f))
525
526 (define-syntax-rule (%8sync-run-at body ... when)
527   (propagate-%async-exceptions
528    (%8sync-abort-to-prompt
529     ;; Send an asynchronous request to apply a continuation to the
530     ;; following function, then handle that as a request to the agenda
531     (make-async-request
532      (lambda (kont)
533        ;; We're making a run request
534        (make-run-request
535         ;; Wrapping the following execution to run...
536         (wrap
537          ;; Once we get the result from the inner part, we'll resume
538          ;; this continuation, but first
539          ;; @@: Is this running immediately, or queueing the result
540          ;;   after evaluation for the next agenda tick?  It looks
541          ;;   like evaluating immediately.  Is that what we want?
542          (kont
543           ;; Any unhandled errors are caught
544           (let ((exception-stack #f))
545             (catch #t
546               ;; Run the actual code the user requested
547               (lambda ()
548                 body ...)
549               ;; If something bad happened and we didn't catch it,
550               ;; we'll wrap it up in such a way that the continuation
551               ;; can address it
552               (lambda (key . args)
553                 (cond
554                  ((eq? key '8sync-caught-error)
555                   (match args
556                     ((orig-key orig-args orig-stacks)
557                      (make-wrapped-exception
558                       orig-key orig-args
559                       (cons exception-stack orig-stacks)))))
560                  (else
561                   (make-wrapped-exception key args
562                                           (list exception-stack)))))
563               (lambda _
564                 (set! exception-stack (make-stack #t 1 0)))))))
565         when))))))
566
567 (define-syntax-rule (%8sync-run-delay body ... delay-time)
568   (%8sync-run-at body ... (tdelta delay-time)))
569
570 (define-syntax-rule (%8sync-delay args ...)
571   (%8sync-run-delay args ...))
572
573 (define-syntax-rule (%8sync-port port port-request-args ...)
574   (%8sync-abort-to-prompt
575    (make-async-request
576     (lambda (kont)
577       (list (make-port-request port port-request-args ...)
578             (make-run-request
579              ;; What's with returning #f to kont?
580              ;; Otherwise we sometimes get errors like
581              ;; "Zero values returned to single-valued continuation""
582              (wrap (kont #f)) #f))))))
583
584 (define-syntax-rule (%8sync-port-remove port)
585   (%8sync-abort-to-prompt
586    (make-async-request
587     (lambda (kont)
588       (list (make-port-remove-request port)
589             (make-run-request
590              ;; See comment in %8sync-port
591              (wrap (kont #f)) #f))))))
592
593
594 ;; TODO: Write (%run-immediately)
595
596 (define-syntax-rule (%8sync-immediate body)
597   "Run body asynchronously but ignore its result...
598 forge ahead in our current function!"
599   (%8sync-abort-to-prompt
600    (make-async-request
601     (lambda (kont)
602       (list (make-run-request
603              ;; See comment in %8sync-port
604              (wrap (kont #f)) #f)
605             (make-run-request body #f))))))
606
607 (define-syntax-rule (catch-8sync exp (handler-key handler) ...)
608   (catch '8sync-caught-error
609     (lambda ()
610       exp)
611     (lambda (_ orig-key orig-args orig-stacks)
612       (cond
613        ((or (eq? handler-key #t)
614             (eq? orig-key handler-key))
615         (apply handler orig-stacks orig-args)) ...
616        (else (raise '8sync-caught-error
617                     orig-key orig-args orig-stacks))))))
618
619 ;; Alias...?
620 (define-syntax-rule (catch-%8sync rest ...)
621   (catch-8sync rest ...))
622
623
624 \f
625 ;;; Execution of agenda, and current agenda
626 ;;; =======================================
627
628 (define %current-agenda (make-parameter #f))
629
630 (define (update-agenda-from-select! agenda)
631   "Potentially (select) on ports specified in agenda, adding items to queue.
632
633 Also handles sleeping when all we have to do is wait on the schedule."
634   (define (hash-keys hash)
635     (hash-map->list (lambda (k v) k) hash))
636   (define (get-wait-time)
637     ;; TODO: we need to figure this out based on whether there's anything
638     ;;   in the queue, and if not, how long till the next scheduled item
639     (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
640       (cond 
641        ((not (q-empty? (agenda-queue agenda)))
642         (cons 0 0))
643        (soonest-time    ; ie, the agenda is non-empty
644         (let* ((current-time (agenda-time agenda)))
645           (if (time<= soonest-time current-time)
646               ;; Well there's something due so let's select
647               ;; (this avoids a (possible?) race condition chance)
648               (cons 0 0)
649               (time-minus soonest-time current-time))))
650        (else
651         (cons #f #f)))))
652   (define (do-select)
653     ;; TODO: support usecond wait time too
654     (match (get-wait-time)
655       ((sec . usec)
656        (catch 'system-error
657          (lambda ()
658            (select (hash-keys (agenda-read-port-map agenda))
659                    (hash-keys (agenda-write-port-map agenda))
660                    (hash-keys (agenda-except-port-map agenda))
661                    sec usec))
662          (lambda (key . rest-args)
663            (match rest-args
664              ((_ _ _ (EINTR))
665               '(() () ()))
666              (_ (error "Unhandled error in select!" key rest-args))))))))
667   (define (get-procs-to-run)
668     (define (ports->procs ports port-map)
669       (lambda (initial-procs)
670         (fold
671          (lambda (port prev)
672            (cons (lambda ()
673                    ((hash-ref port-map port) port))
674                  prev))
675          initial-procs
676          ports)))
677     (match (do-select)
678       ((read-ports write-ports except-ports)
679        ;; @@: Come on, we can do better than append ;P
680        ((compose (ports->procs
681                   read-ports
682                   (agenda-read-port-map agenda))
683                  (ports->procs
684                   write-ports
685                   (agenda-write-port-map agenda))
686                  (ports->procs
687                   except-ports
688                   (agenda-except-port-map agenda)))
689         '()))))
690   (define (update-agenda)
691     (let ((procs-to-run (get-procs-to-run))
692           (q (agenda-queue agenda)))
693       (for-each
694        (lambda (proc)
695          (enq! q proc))
696        procs-to-run))
697     agenda)
698   (define (ports-to-select?)
699     (define (has-items? selector)
700       ;; @@: O(n)
701       ;;    ... we could use hash-for-each and a continuation to jump
702       ;;    out with a #t at first indication of an item
703       (not (= (hash-count (const #t)
704                           (selector agenda))
705               0)))
706     (or (has-items? agenda-read-port-map)
707         (has-items? agenda-write-port-map)
708         (has-items? agenda-except-port-map)))
709
710   (if (or (ports-to-select?)
711           ;; select doubles as sleep...
712           (not (schedule-empty? (agenda-schedule agenda)))) 
713       (update-agenda)
714       agenda))
715
716 (define (agenda-handle-port-request! agenda port-request)
717   "Update an agenda for a port-request"
718   (define (handle-selector request-selector port-map-selector)
719     (if (request-selector port-request)
720         ;; @@: Should we remove if #f?
721         (hash-set! (port-map-selector agenda)
722                    (port-request-port port-request)
723                    (request-selector port-request))))
724   (handle-selector port-request-read agenda-read-port-map)
725   (handle-selector port-request-write agenda-write-port-map)
726   (handle-selector port-request-except agenda-except-port-map))
727
728
729 (define (agenda-handle-port-remove-request! agenda port-remove-request)
730   "Update an agenda for a port-remove-request"
731   (let ((port (port-remove-request-port port-remove-request)))
732     (hash-remove! (agenda-read-port-map agenda) port)
733     (hash-remove! (agenda-write-port-map agenda) port)
734     (hash-remove! (agenda-except-port-map agenda) port)))
735
736
737 (define (stop-on-nothing-to-do agenda)
738   (and (q-empty? (agenda-queue agenda))
739        (schedule-empty? (agenda-schedule agenda))
740        (= 0 (hash-count (const #t) (agenda-read-port-map agenda)))
741        (= 0 (hash-count (const #t) (agenda-write-port-map agenda)))
742        (= 0 (hash-count (const #t) (agenda-except-port-map agenda)))))
743
744
745 (define* (start-agenda agenda
746                        #:key
747                        ;; @@: Should we make stop-on-nothing-to-do
748                        ;;   the default stop-condition?
749                        stop-condition
750                        (get-time gettimeofday)
751                        (handle-ports update-agenda-from-select!))
752   ;; TODO: Document fields
753   "Start up the AGENDA"
754   (let loop ((agenda agenda))
755     (let ((agenda   
756            ;; @@: Hm, maybe here would be a great place to handle
757            ;;   select'ing on ports.
758            ;;   We could compose over agenda-run-once and agenda-read-ports
759            (agenda-run-once agenda)))
760       (if (and stop-condition (stop-condition agenda))
761           'done
762           (let* ((agenda
763                   ;; We have to update the time after ports handled, too
764                   ;; because it may have changed after a select
765                   (set-field
766                    (handle-ports
767                     ;; Adjust the agenda's time just in time
768                     ;; We do this here rather than in agenda-run-once to make
769                     ;; agenda-run-once's behavior fairly predictable
770                     (set-field agenda (agenda-time) (get-time)))
771                    (agenda-time) (get-time))))
772             ;; Update the agenda's current queue based on
773             ;; currently applicable time segments
774             (add-segments-contents-to-queue!
775              (schedule-extract-until! (agenda-schedule agenda) (agenda-time agenda))
776              (agenda-queue agenda))
777             (loop agenda))))))
778
779
780 (define (print-error-and-continue key . args)
781   "Frequently used as pre-unwind-handler for agenda"
782   (cond
783    ((eq? key '8sync-caught-error)
784     (match args
785       ((orig-key orig-args stacks)
786        (display "\n*** Caught async exception. ***\n")
787        (format (current-error-port)
788                "* Original key '~s and arguments: ~s *\n"
789                orig-key orig-args)
790        (display "* Caught stacks below (ending with original) *\n\n")
791        (for-each
792         (lambda (s)
793           (display-backtrace s (current-error-port))
794           (newline (current-error-port)))
795         stacks))))
796    (else
797     (format (current-error-port)
798             "\n*** Caught exception with key '~s and arguments: ~s ***\n"
799             key args)
800     (display-backtrace (make-stack #t 1 0)
801                        (current-error-port))
802     (newline (current-error-port)))))
803
804 (define-syntax-rule (maybe-catch-all (catch-handler pre-unwind-handler)
805                                      body ...)
806   (if (or catch-handler pre-unwind-handler)
807       (catch
808         #t
809         (lambda ()
810           body ...)
811         (or catch-handler (lambda _ #f))
812         (or pre-unwind-handler (lambda _ #f)))
813       (begin body ...)))
814
815 (define (agenda-run-once agenda)
816   "Run once through the agenda, and produce a new agenda
817 based on the results"
818   (define (call-proc proc)
819     (call-with-prompt
820      (agenda-prompt-tag agenda)
821      (lambda ()
822        (parameterize ((%current-agenda agenda))
823          (maybe-catch-all
824           ((agenda-catch-handler agenda)
825            (agenda-pre-unwind-handler agenda))
826           (proc))))
827      (lambda (kont async-request)
828        (setup-async-request kont async-request))))
829
830   (let ((queue (agenda-queue agenda))
831         (next-queue (make-q)))
832     (while (not (q-empty? queue))
833       (let* ((proc (q-pop! queue))
834              (proc-result (call-proc proc))
835              (enqueue
836               (lambda (run-request)
837                 (define (schedule-at! time proc)
838                   (schedule-add! (agenda-schedule agenda) time proc))
839                 (let ((request-time (run-request-when run-request)))
840                   (match request-time
841                     ((? time-delta? time-delta)
842                      (let ((time (time-delta+ (agenda-time agenda)
843                                               time-delta)))
844                        (schedule-at! time (run-request-proc run-request))))
845                     ((? integer? sec)
846                      (let ((time (cons sec 0)))
847                        (schedule-at! time (run-request-proc run-request))))
848                     (((? integer? sec) . (? integer? usec))
849                      (schedule-at! request-time (run-request-proc run-request)))
850                     (#f
851                      (enq! next-queue (run-request-proc run-request))))))))
852         (define (handle-individual result)
853           ;; @@: Could maybe optimize by switching to an explicit cond...
854           (match result
855             ((? run-request? new-proc)
856              (enqueue new-proc))
857             ((? port-request? port-request)
858              (agenda-handle-port-request! agenda port-request))
859             ((? port-remove-request? port-remove-request)
860              (agenda-handle-port-remove-request! agenda port-remove-request))
861             ;; do nothing
862             (_ #f)))
863         ;; @@: We might support delay-wrapped procedures here
864         (match proc-result
865           ((results ...)
866            (for-each handle-individual results))
867           (one-result (handle-individual one-result)))))
868     ;; TODO: Alternately, we could return the next-queue
869     ;;   along with changes to be added to the schedule here?
870     ;; Return new agenda, with next queue set
871     (set-field agenda (agenda-queue) next-queue)))