agenda: Add "8sleep" sugar.
[8sync.git] / 8sync / agenda.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright (C) 2015, 2016 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync agenda)
20   #:use-module (srfi srfi-1)
21   #:use-module (srfi srfi-9)
22   #:use-module (srfi srfi-9 gnu)
23   #:use-module (ice-9 q)
24   #:use-module (ice-9 match)
25   #:use-module (ice-9 receive)
26   #:export (<agenda>
27             make-agenda agenda?
28             agenda-queue agenda-prompt-tag
29             agenda-read-port-map agenda-write-port-map agenda-except-port-map
30             agenda-schedule
31             
32             make-async-prompt-tag
33
34             list->q make-q*
35
36             <time-segment>
37             make-time-segment time-segment?
38             time-segment-time time-segment-queue
39
40             time< time= time<= time-delta+
41             time-minus time-plus
42
43             <time-delta>
44             make-time-delta tdelta time-delta?
45             time-delta-sec time-delta-usec
46
47             <schedule>
48             make-schedule schedule?
49             schedule-add! schedule-empty?
50             schedule-segments
51             schedule-soonest-time
52
53             schedule-segments-split schedule-extract-until!
54             add-segments-contents-to-queue!
55
56             <run-request>
57             make-run-request run-request?
58             run-request-proc run-request-when
59
60             <port-request>
61             make-port-request port-request port-request?
62             port-request-port
63             port-request-read port-request-write port-request-except
64
65             <port-remove-request>
66             make-port-remove-request port-remove-request port-remove-request?
67             port-remove-request-port
68
69             run-it wrap wrap-apply run run-at run-delay
70
71             8sync 8sync-delay
72             8sync-run 8sync-run-at 8sync-run-delay
73             8sync-port 8sync-port-remove
74             8sync-nowait
75             8sleep
76             
77             catch-8sync
78
79             ;; used for introspecting the error, but a method for making
80             ;; is not exposed
81             wrapped-exception?
82             wrapped-exception-key wrapped-exception-args
83             wrapped-exception-stacks
84
85             print-error-and-continue
86
87             stop-on-nothing-to-do
88
89             %current-agenda
90             start-agenda agenda-run-once))
91
92 ;; @@: Using immutable agendas here, so wouldn't it make sense to
93 ;;   replace this queue stuff with using pfds based immutable queues?
94
95 \f
96 ;;; Agenda definition
97 ;;; =================
98
99 ;;; The agenda consists of:
100 ;;;  - a queue of immediate items to handle
101 ;;;  - sheduled future events to be added to a future queue
102 ;;;  - a tag by which running processes can escape for some asynchronous
103 ;;;    operation (from which they can be returned later)
104 ;;;  - a mapping of ports to various handler procedures
105 ;;;
106 ;;; The goal, eventually, is for this all to be immutable and functional.
107 ;;; However, we aren't there yet.  Some tricky things:
108 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
109 ;;;  - Need to use immutable queues (ijp's pfds library?)
110 ;;;  - Modeling reading from ports as something repeatable,
111 ;;;    and with reasonable separation from functional components?
112
113 (define-immutable-record-type <agenda>
114   (make-agenda-intern queue prompt-tag
115                       read-port-map write-port-map except-port-map
116                       schedule time catch-handler pre-unwind-handler)
117   agenda?
118   (queue agenda-queue)
119   (prompt-tag agenda-prompt-tag)
120   (read-port-map agenda-read-port-map)
121   (write-port-map agenda-write-port-map)
122   (except-port-map agenda-except-port-map)
123   (schedule agenda-schedule)
124   (time agenda-time)
125   (catch-handler agenda-catch-handler)
126   (pre-unwind-handler agenda-pre-unwind-handler))
127
128 (define (make-async-prompt-tag)
129   "Make an async prompt tag for an agenda.
130
131 Generally done automatically for the user through (make-agenda)."
132   (make-prompt-tag "prompt"))
133
134 (define* (make-agenda #:key
135                       (queue (make-q))
136                       (prompt (make-prompt-tag))
137                       (read-port-map (make-hash-table))
138                       (write-port-map (make-hash-table))
139                       (except-port-map (make-hash-table))
140                       (schedule (make-schedule))
141                       (time (gettimeofday))
142                       (catch-handler #f)
143                       (pre-unwind-handler print-error-and-continue))
144   ;; TODO: document arguments
145   "Make a fresh agenda."
146   (make-agenda-intern queue prompt
147                       read-port-map write-port-map except-port-map
148                       schedule time
149                       catch-handler pre-unwind-handler))
150
151 (define (current-agenda-prompt)
152   "Get the prompt for the current agenda; signal an error if there isn't one."
153   (let ((current-agenda (%current-agenda)))
154     (if (not current-agenda)
155         (throw
156          'no-current-agenda
157          "Can't get current agenda prompt if there's no current agenda!")
158         (agenda-prompt-tag current-agenda))))
159
160 ;; helper for making queues for an agenda
161 (define (list->q lst)
162   "Makes a queue composed of LST items"
163   (let ((q (make-q)))
164     (for-each
165      (lambda (x)
166        (enq! q x))
167      lst)
168     q))
169
170 (define (make-q* . args)
171   "Makes a queue and populates it with this invocation's ARGS"
172   (list->q args))
173
174 \f
175 ;;; Schedule
176 ;;; ========
177
178 ;;; This is where we handle timed events for the future
179
180 ;; This section totally borrows from the ideas in SICP
181 ;; <3 <3 <3
182
183 ;; NOTE: time is a cons of (seconds . microseconds)
184
185 (define-record-type <time-segment>
186   (make-time-segment-intern time queue)
187   time-segment?
188   (time time-segment-time)
189   (queue time-segment-queue))
190
191 ;; @@: This seems to be the same as srfi-18's seconds->time procedure?
192 ;;   Maybe double check and switch to that?  (Thanks amz3!)
193
194 (define (time-from-float-or-fraction time)
195   "Produce a (sec . usec) pair from TIME, a float or fraction"
196   (let* ((mixed-whole (floor time))
197          (mixed-rest (- time mixed-whole))  ; float or fraction component
198          (sec mixed-whole)
199          (usec (floor (* 1000000 mixed-rest))))
200     (cons (inexact->exact sec) (inexact->exact usec))))
201
202 (define (time-segment-right-format time)
203   "Ensure TIME is in the right format.
204
205 The right format means (second . microsecond).
206 If an integer, will convert appropriately."
207   ;; TODO: add floating point / rational number support.
208   (match time
209     ;; time is already a cons of second and microsecnd
210     (((? integer? s) . (? integer? u)) time)
211     ;; time was just an integer (just the second)
212     ((? integer? _) (cons time 0))
213     ((or (? rational? _) (? inexact? _))
214      (time-from-float-or-fraction time))
215     (_ (throw 'invalid-time "Invalid time" time))))
216
217 (define* (make-time-segment time #:optional (queue (make-q)))
218   "Make a time segment of TIME and QUEUE
219
220 No automatic conversion is done, so you might have to
221 run (time-segment-right-format) first."
222   (make-time-segment-intern time queue))
223
224 (define (time< time1 time2)
225   "Check if TIME1 is less than TIME2"
226   (cond ((< (car time1)
227             (car time2))
228          #t)
229         ((and (= (car time1)
230                  (car time2))
231               (< (cdr time1)
232                  (cdr time2)))
233          #t)
234         (else #f)))
235
236 (define (time= time1 time2)
237   "Check whether TIME1 and TIME2 are equivalent"
238   (and (= (car time1) (car time2))
239        (= (cdr time1) (cdr time2))))
240
241 (define (time<= time1 time2)
242   "Check if TIME1 is less than or equal to TIME2"
243   (or (time< time1 time2)
244       (time= time1 time2)))
245
246
247 (define-record-type <time-delta>
248   (make-time-delta-intern sec usec)
249   time-delta?
250   (sec time-delta-sec)
251   (usec time-delta-usec))
252
253 (define* (make-time-delta time)
254   "Make a <time-delta> of SEC seconds and USEC microseconds.
255
256 This is used primarily so the agenda can recognize RUN-REQUEST objects
257 which are meant to delay computation"
258   (match (time-segment-right-format time)
259     ((sec . usec)
260      (make-time-delta-intern sec usec))))
261
262 (define tdelta make-time-delta)
263
264 (define (time-carry-correct time)
265   "Corrects/handles time microsecond carry.
266 Will produce (0 . 0) instead of a negative number, if needed."
267   (cond ((>= (cdr time) 1000000)
268          (cons
269           (+ (car time) 1)
270           (- (cdr time) 1000000)))
271         ((< (cdr time) 0)
272          (if (= (car time) 0)
273              '(0 0)
274              (cons
275               (- (car time) 1)
276               (+ (cdr time) 1000000))))
277         (else time)))
278
279 (define (time-delta+ time time-delta)
280   "Increment a TIME by the value of TIME-DELTA"
281   (time-carry-correct
282    (cons (+ (car time) (time-delta-sec time-delta))
283          (+ (cdr time) (time-delta-usec time-delta)))))
284
285 (define (time-minus time1 time2)
286   "Subtract TIME2 from TIME1"
287   (time-carry-correct
288    (cons (- (car time1) (car time2))
289          (- (cdr time1) (cdr time2)))))
290
291 (define (time-plus time1 time2)
292   "Add TIME1 and TIME2"
293   (time-carry-correct
294    (cons (+ (car time1) (car time2))
295          (+ (cdr time1) (cdr time2)))))
296
297
298 (define-record-type <schedule>
299   (make-schedule-intern segments)
300   schedule?
301   (segments schedule-segments set-schedule-segments!))
302
303 (define* (make-schedule #:optional segments)
304   "Make a schedule, optionally pre-composed of SEGMENTS"
305   (make-schedule-intern (or segments '())))
306
307 (define (schedule-soonest-time schedule)
308   "Return a cons of (sec . usec) for next time segement, or #f if none"
309   (let ((segments (schedule-segments schedule)))
310     (if (eq? segments '())
311         #f
312         (time-segment-time (car segments)))))
313
314 ;; TODO: This code is reasonably easy to read but it
315 ;;   mutates AND is worst case of O(n) in both space and time :(
316 ;;   but at least it'll be reasonably easy to refactor to
317 ;;   a more functional setup?
318 (define (schedule-add! schedule time proc)
319   "Mutate SCHEDULE, adding PROC at an appropriate time segment for TIME"
320   (let ((time (time-segment-right-format time)))
321     (define (new-time-segment)
322       (let ((new-segment
323              (make-time-segment time)))
324         (enq! (time-segment-queue new-segment) proc)
325         new-segment))
326     (define (loop segments)
327       (define (segment-equals-time? segment)
328         (time= time (time-segment-time segment)))
329
330       (define (segment-more-than-time? segment)
331         (time< time (time-segment-time segment)))
332
333       ;; We could switch this out to be more mutate'y
334       ;; and avoid the O(n) of space... is that over-optimizing?
335       (match segments
336         ;; If we're at the end of the list, time to make a new
337         ;; segment...
338         ('() (cons (new-time-segment) '()))
339         ;; If the segment's time is exactly our time, good news
340         ;; everyone!  Let's append our stuff to its queue
341         (((? segment-equals-time? first) rest ...)
342          (enq! (time-segment-queue first) proc)
343          segments)
344         ;; If the first segment is more than our time,
345         ;; ours belongs before this one, so add it and
346         ;; start consing our way back
347         (((? segment-more-than-time? first) rest ...)
348          (cons (new-time-segment) segments))
349         ;; Otherwise, build up recursive result
350         ((first rest ... )
351          (cons first (loop rest)))))
352     (set-schedule-segments!
353      schedule
354      (loop (schedule-segments schedule)))))
355
356 (define (schedule-empty? schedule)
357   "Check if the SCHEDULE is currently empty"
358   (eq? (schedule-segments schedule) '()))
359
360 (define (schedule-segments-split schedule time)
361   "Does a multiple value return of time segments before/at and after TIME"
362   (let ((time (time-segment-right-format time)))
363     (define (segment-is-now? segment)
364       (time= (time-segment-time segment) time))
365     (define (segment-is-before-now? segment)
366       (time< (time-segment-time segment) time))
367
368     (let loop ((segments-before '())
369                (segments-left (schedule-segments schedule)))
370       (match segments-left
371         ;; end of the line, return
372         ('()
373          (values (reverse segments-before) '()))
374
375         ;; It's right now, so time to stop, but include this one in before
376         ;; but otherwise return
377         (((? segment-is-now? first) rest ...)
378          (values (reverse (cons first segments-before)) rest))
379
380         ;; This is prior or at now, so add it and keep going
381         (((? segment-is-before-now? first) rest ...)
382          (loop (cons first segments-before) rest))
383
384         ;; Otherwise it's past now, just return what we have
385         (segments-after
386          (values segments-before segments-after))))))
387
388 (define (schedule-extract-until! schedule time)
389   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
390   (receive (segments-before segments-after)
391       (schedule-segments-split schedule time)
392     (set-schedule-segments! schedule segments-after)
393     segments-before))
394
395 (define (add-segments-contents-to-queue! segments queue)
396   (for-each
397    (lambda (segment)
398      (let ((seg-queue (time-segment-queue segment)))
399        (while (not (q-empty? seg-queue))
400          (enq! queue (deq! seg-queue)))))
401    segments))
402
403
404 \f
405 ;;; Request to run stuff
406 ;;; ====================
407
408 (define-record-type <run-request>
409   (make-run-request proc when)
410   run-request?
411   (proc run-request-proc)
412   (when run-request-when))
413
414 (define* (run-it proc #:optional when)
415   "Make a request to run PROC (possibly at WHEN)"
416   (make-run-request proc when))
417
418 (define-syntax-rule (wrap body ...)
419   "Wrap contents in a procedure"
420   (lambda ()
421     body ...))
422
423 (define-syntax-rule (wrap-apply body)
424   "Wrap possibly multi-value function in a procedure, applies all arguments"
425   (lambda args
426     (apply body args)))
427
428
429 ;; @@: Do we really want `body ...' here?
430 ;;   what about just `body'?
431 (define-syntax-rule (run body ...)
432   "Run everything in BODY but wrap in a convenient procedure"
433   (make-run-request (wrap body ...) #f))
434
435 (define-syntax-rule (run-at body ... when)
436   "Run BODY at WHEN"
437   (make-run-request (wrap body ...) when))
438
439 ;; @@: Is it okay to overload the term "delay" like this?
440 ;;   Would `run-in' be better?
441 (define-syntax-rule (run-delay body ... delay-time)
442   "Run BODY at DELAY-TIME time from now"
443   (make-run-request (wrap body ...) (tdelta delay-time)))
444
445
446 ;; A request to set up a port with at least one of read, write, except
447 ;; handling processes
448
449 (define-record-type <port-request>
450   (make-port-request-intern port read write except)
451   port-request?
452   (port port-request-port)
453   (read port-request-read)
454   (write port-request-write)
455   (except port-request-except))
456
457 (define* (make-port-request port #:key read write except)
458   (if (not (or read write except))
459       (throw 'no-port-handler-given "No port handler given.\n"))
460   (make-port-request-intern port read write except))
461
462 (define port-request make-port-request)
463
464 (define-record-type <port-remove-request>
465   (make-port-remove-request port)
466   port-remove-request?
467   (port port-remove-request-port))
468
469 (define port-remove-request make-port-remove-request)
470
471
472 \f
473 ;;; Asynchronous escape to run things
474 ;;; =================================
475
476 (define-syntax-rule (8sync-abort-to-prompt async-request)
477   (abort-to-prompt (current-agenda-prompt)
478                    async-request))
479
480 ;; Async port request and run-request meta-requests
481 (define (make-async-request proc)
482   "Wrap PROC in an async-request
483
484 The purpose of this is to make sure that users don't accidentally
485 return the wrong thing via (8sync) and trip themselves up."
486   (cons '*async-request* proc))
487
488 (define (setup-async-request resume-kont async-request)
489   "Complete an async request for agenda-run-once's continuation handling"
490   (match async-request
491     (('*async-request* . async-setup-proc)
492      (async-setup-proc resume-kont))
493     ;; TODO: deliver more helpful errors depending on what the user
494     ;;   returned
495     (_ (throw 'invalid-async-request
496               "Invalid request passed back via an (8sync) procedure."
497               async-request))))
498
499 (define-record-type <wrapped-exception>
500   (make-wrapped-exception key args stacks)
501   wrapped-exception?
502   (key wrapped-exception-key)
503   (args wrapped-exception-args)
504   (stacks wrapped-exception-stacks))
505
506 (define-syntax-rule (propagate-%async-exceptions body)
507   (let ((body-result body))
508     (if (wrapped-exception? body-result)
509         (throw '8sync-caught-error
510                (wrapped-exception-key body-result)
511                (wrapped-exception-args body-result)
512                (wrapped-exception-stacks body-result))
513         body-result)))
514
515 (define-syntax 8sync
516   (syntax-rules ()
517     "Run BODY asynchronously (8synchronously?) at a prompt, then return.
518
519 Possibly specify WHEN as the second argument."
520     ((8sync body)
521      (8sync-run body))
522     ((8sync body when)
523      (8sync-run-at body when))))
524
525 (define-syntax-rule (8sync-run body ...)
526   (8sync-run-at body ... #f))
527
528 (define-syntax-rule (8sync-run-at body ... when)
529   (propagate-%async-exceptions
530    (8sync-abort-to-prompt
531     ;; Send an asynchronous request to apply a continuation to the
532     ;; following function, then handle that as a request to the agenda
533     (make-async-request
534      (lambda (kont)
535        ;; We're making a run request
536        (make-run-request
537         ;; Wrapping the following execution to run...
538         (wrap
539          ;; Once we get the result from the inner part, we'll resume
540          ;; this continuation, but first
541          ;; @@: Is this running immediately, or queueing the result
542          ;;   after evaluation for the next agenda tick?  It looks
543          ;;   like evaluating immediately.  Is that what we want?
544          (kont
545           ;; Any unhandled errors are caught
546           (let ((exception-stack #f))
547             (catch #t
548               ;; Run the actual code the user requested
549               (lambda ()
550                 body ...)
551               ;; If something bad happened and we didn't catch it,
552               ;; we'll wrap it up in such a way that the continuation
553               ;; can address it
554               (lambda (key . args)
555                 (cond
556                  ((eq? key '8sync-caught-error)
557                   (match args
558                     ((orig-key orig-args orig-stacks)
559                      (make-wrapped-exception
560                       orig-key orig-args
561                       (cons exception-stack orig-stacks)))))
562                  (else
563                   (make-wrapped-exception key args
564                                           (list exception-stack)))))
565               (lambda _
566                 (set! exception-stack (make-stack #t 1 0)))))))
567         when))))))
568
569 (define-syntax-rule (8sync-run-delay body ... delay-time)
570   (8sync-run-at body ... (tdelta delay-time)))
571
572 (define-syntax-rule (8sync-delay args ...)
573   (8sync-run-delay args ...))
574
575 (define-syntax-rule (8sync-port port port-request-args ...)
576   (8sync-abort-to-prompt
577    (make-async-request
578     (lambda (kont)
579       (list (make-port-request port port-request-args ...)
580             (make-run-request
581              ;; What's with returning #f to kont?
582              ;; Otherwise we sometimes get errors like
583              ;; "Zero values returned to single-valued continuation""
584              (wrap (kont #f)) #f))))))
585
586 (define-syntax-rule (8sync-port-remove port)
587   (8sync-abort-to-prompt
588    (make-async-request
589     (lambda (kont)
590       (list (make-port-remove-request port)
591             (make-run-request
592              ;; See comment in 8sync-port
593              (wrap (kont #f)) #f))))))
594
595
596 ;; TODO: Write (%run-immediately)
597
598 (define-syntax-rule (8sync-nowait body)
599   "Run body asynchronously but ignore its result...
600 forge ahead in our current function!"
601   (8sync-abort-to-prompt
602    (make-async-request
603     (lambda (kont)
604       (list (make-run-request
605              ;; See comment in 8sync-port
606              (wrap (kont #f)) #f)
607             (make-run-request (lambda () body) #f))))))
608
609 (define-syntax-rule (catch-8sync exp (handler-key handler) ...)
610   (catch '8sync-caught-error
611     (lambda ()
612       exp)
613     (lambda (_ orig-key orig-args orig-stacks)
614       (cond
615        ((or (eq? handler-key #t)
616             (eq? orig-key handler-key))
617         (apply handler orig-stacks orig-args)) ...
618        (else (raise '8sync-caught-error
619                     orig-key orig-args orig-stacks))))))
620
621 ;; This is sugar... and could probably be considerably
622 ;; simplified and optimized.  But whatever.
623 (define-syntax-rule (8sleep time)
624   (8sync-delay 'no-op time))
625
626
627 \f
628 ;;; Execution of agenda, and current agenda
629 ;;; =======================================
630
631 (define %current-agenda (make-parameter #f))
632
633 (define (update-agenda-from-select! agenda)
634   "Potentially (select) on ports specified in agenda, adding items to queue.
635
636 Also handles sleeping when all we have to do is wait on the schedule."
637   (define (hash-keys hash)
638     (hash-map->list (lambda (k v) k) hash))
639   (define (get-wait-time)
640     ;; TODO: we need to figure this out based on whether there's anything
641     ;;   in the queue, and if not, how long till the next scheduled item
642     (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
643       (cond 
644        ((not (q-empty? (agenda-queue agenda)))
645         (cons 0 0))
646        (soonest-time    ; ie, the agenda is non-empty
647         (let* ((current-time (agenda-time agenda)))
648           (if (time<= soonest-time current-time)
649               ;; Well there's something due so let's select
650               ;; (this avoids a (possible?) race condition chance)
651               (cons 0 0)
652               (time-minus soonest-time current-time))))
653        (else
654         (cons #f #f)))))
655   (define (do-select)
656     ;; TODO: support usecond wait time too
657     (match (get-wait-time)
658       ((sec . usec)
659        (catch 'system-error
660          (lambda ()
661            (select (hash-keys (agenda-read-port-map agenda))
662                    (hash-keys (agenda-write-port-map agenda))
663                    (hash-keys (agenda-except-port-map agenda))
664                    sec usec))
665          (lambda (key . rest-args)
666            (match rest-args
667              ((_ _ _ (EINTR))
668               '(() () ()))
669              (_ (error "Unhandled error in select!" key rest-args))))))))
670   (define (get-procs-to-run)
671     (define (ports->procs ports port-map)
672       (lambda (initial-procs)
673         (fold
674          (lambda (port prev)
675            (cons (lambda ()
676                    ((hash-ref port-map port) port))
677                  prev))
678          initial-procs
679          ports)))
680     (match (do-select)
681       ((read-ports write-ports except-ports)
682        ;; @@: Come on, we can do better than append ;P
683        ((compose (ports->procs
684                   read-ports
685                   (agenda-read-port-map agenda))
686                  (ports->procs
687                   write-ports
688                   (agenda-write-port-map agenda))
689                  (ports->procs
690                   except-ports
691                   (agenda-except-port-map agenda)))
692         '()))))
693   (define (update-agenda)
694     (let ((procs-to-run (get-procs-to-run))
695           (q (agenda-queue agenda)))
696       (for-each
697        (lambda (proc)
698          (enq! q proc))
699        procs-to-run))
700     agenda)
701   (define (ports-to-select?)
702     (define (has-items? selector)
703       ;; @@: O(n)
704       ;;    ... we could use hash-for-each and a continuation to jump
705       ;;    out with a #t at first indication of an item
706       (not (= (hash-count (const #t)
707                           (selector agenda))
708               0)))
709     (or (has-items? agenda-read-port-map)
710         (has-items? agenda-write-port-map)
711         (has-items? agenda-except-port-map)))
712
713   (if (or (ports-to-select?)
714           ;; select doubles as sleep...
715           (not (schedule-empty? (agenda-schedule agenda)))) 
716       (update-agenda)
717       agenda))
718
719 (define (agenda-handle-port-request! agenda port-request)
720   "Update an agenda for a port-request"
721   (define (handle-selector request-selector port-map-selector)
722     (if (request-selector port-request)
723         ;; @@: Should we remove if #f?
724         (hash-set! (port-map-selector agenda)
725                    (port-request-port port-request)
726                    (request-selector port-request))))
727   (handle-selector port-request-read agenda-read-port-map)
728   (handle-selector port-request-write agenda-write-port-map)
729   (handle-selector port-request-except agenda-except-port-map))
730
731
732 (define (agenda-handle-port-remove-request! agenda port-remove-request)
733   "Update an agenda for a port-remove-request"
734   (let ((port (port-remove-request-port port-remove-request)))
735     (hash-remove! (agenda-read-port-map agenda) port)
736     (hash-remove! (agenda-write-port-map agenda) port)
737     (hash-remove! (agenda-except-port-map agenda) port)))
738
739
740 (define (stop-on-nothing-to-do agenda)
741   (and (q-empty? (agenda-queue agenda))
742        (schedule-empty? (agenda-schedule agenda))
743        (= 0 (hash-count (const #t) (agenda-read-port-map agenda)))
744        (= 0 (hash-count (const #t) (agenda-write-port-map agenda)))
745        (= 0 (hash-count (const #t) (agenda-except-port-map agenda)))))
746
747
748 (define* (start-agenda agenda
749                        #:key
750                        ;; @@: Should we make stop-on-nothing-to-do
751                        ;;   the default stop-condition?
752                        (stop-condition stop-on-nothing-to-do)
753                        (get-time gettimeofday)
754                        (handle-ports update-agenda-from-select!)
755                        ;; For live hacking madness, etc
756                        (post-run-hook #f))
757   ;; TODO: Document fields
758   "Start up the AGENDA"
759   (let loop ((agenda agenda))
760     (let ((agenda   
761            ;; @@: Hm, maybe here would be a great place to handle
762            ;;   select'ing on ports.
763            ;;   We could compose over agenda-run-once and agenda-read-ports
764            (agenda-run-once agenda)))
765       ;; @@: This relies on mutation at present on the queue, in the rare
766       ;;   event it's used.  If we ever switch to something more immutable,
767       ;;   it should return a new modified agenda instead.
768       (if post-run-hook
769           (post-run-hook agenda))
770       (if (and stop-condition (stop-condition agenda))
771           'done
772           (let* ((agenda
773                   ;; We have to update the time after ports handled, too
774                   ;; because it may have changed after a select
775                   (set-field
776                    (handle-ports
777                     ;; Adjust the agenda's time just in time
778                     ;; We do this here rather than in agenda-run-once to make
779                     ;; agenda-run-once's behavior fairly predictable
780                     (set-field agenda (agenda-time) (get-time)))
781                    (agenda-time) (get-time))))
782             ;; Update the agenda's current queue based on
783             ;; currently applicable time segments
784             (add-segments-contents-to-queue!
785              (schedule-extract-until! (agenda-schedule agenda) (agenda-time agenda))
786              (agenda-queue agenda))
787             (loop agenda))))))
788
789
790 (define (print-error-and-continue key . args)
791   "Frequently used as pre-unwind-handler for agenda"
792   (cond
793    ((eq? key '8sync-caught-error)
794     (match args
795       ((orig-key orig-args stacks)
796        (display "\n*** Caught async exception. ***\n")
797        (format (current-error-port)
798                "* Original key '~s and arguments: ~s *\n"
799                orig-key orig-args)
800        (display "* Caught stacks below (ending with original) *\n\n")
801        (for-each
802         (lambda (s)
803           (display-backtrace s (current-error-port))
804           (newline (current-error-port)))
805         stacks))))
806    (else
807     (format (current-error-port)
808             "\n*** Caught exception with key '~s and arguments: ~s ***\n"
809             key args)
810     (display-backtrace (make-stack #t 1 0)
811                        (current-error-port))
812     (newline (current-error-port)))))
813
814 (define-syntax-rule (maybe-catch-all (catch-handler pre-unwind-handler)
815                                      body ...)
816   (if (or catch-handler pre-unwind-handler)
817       (catch
818         #t
819         (lambda ()
820           body ...)
821         (or catch-handler (lambda _ #f))
822         (or pre-unwind-handler (lambda _ #f)))
823       (begin body ...)))
824
825 (define (agenda-run-once agenda)
826   "Run once through the agenda, and produce a new agenda
827 based on the results"
828   (define (call-proc proc)
829     (call-with-prompt
830      (agenda-prompt-tag agenda)
831      (lambda ()
832        (parameterize ((%current-agenda agenda))
833          (maybe-catch-all
834           ((agenda-catch-handler agenda)
835            (agenda-pre-unwind-handler agenda))
836           (proc))))
837      (lambda (kont async-request)
838        (setup-async-request kont async-request))))
839
840   (let ((queue (agenda-queue agenda))
841         (next-queue (make-q)))
842     (while (not (q-empty? queue))
843       (let* ((proc (q-pop! queue))
844              (proc-result (call-proc proc))
845              (enqueue
846               (lambda (run-request)
847                 (define (schedule-at! time proc)
848                   (schedule-add! (agenda-schedule agenda) time proc))
849                 (let ((request-time (run-request-when run-request)))
850                   (match request-time
851                     ((? time-delta? time-delta)
852                      (let ((time (time-delta+ (agenda-time agenda)
853                                               time-delta)))
854                        (schedule-at! time (run-request-proc run-request))))
855                     ((? integer? sec)
856                      (let ((time (cons sec 0)))
857                        (schedule-at! time (run-request-proc run-request))))
858                     (((? integer? sec) . (? integer? usec))
859                      (schedule-at! request-time (run-request-proc run-request)))
860                     (#f
861                      (enq! next-queue (run-request-proc run-request))))))))
862         (define (handle-individual result)
863           ;; @@: Could maybe optimize by switching to an explicit cond...
864           (match result
865             ((? run-request? new-proc)
866              (enqueue new-proc))
867             ((? port-request? port-request)
868              (agenda-handle-port-request! agenda port-request))
869             ((? port-remove-request? port-remove-request)
870              (agenda-handle-port-remove-request! agenda port-remove-request))
871             ;; do nothing
872             (_ #f)))
873         ;; @@: We might support delay-wrapped procedures here
874         (match proc-result
875           ((results ...)
876            (for-each handle-individual results))
877           (one-result (handle-individual one-result)))))
878     ;; TODO: Alternately, we could return the next-queue
879     ;;   along with changes to be added to the schedule here?
880     ;; Return new agenda, with next queue set
881     (set-field agenda (agenda-queue) next-queue)))