eba0139fa2268f526430ae73f00285ab0e215e4b
[8sync.git] / 8sync / agenda.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright (C) 2015 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync agenda)
20   #:use-module (srfi srfi-1)
21   #:use-module (srfi srfi-9)
22   #:use-module (srfi srfi-9 gnu)
23   #:use-module (ice-9 q)
24   #:use-module (ice-9 match)
25   #:use-module (ice-9 receive)
26   #:export (<agenda>
27             make-agenda agenda?
28             agenda-queue agenda-prompt-tag
29             agenda-read-port-map agenda-write-port-map agenda-except-port-map
30             agenda-schedule
31             
32             make-async-prompt-tag
33
34             list->q make-q*
35
36             <time-segment>
37             make-time-segment time-segment?
38             time-segment-time time-segment-queue
39
40             time< time= time<= time-delta+
41             time-minus time-plus
42
43             <time-delta>
44             make-time-delta tdelta time-delta?
45             time-delta-sec time-delta-usec
46
47             <schedule>
48             make-schedule schedule?
49             schedule-add! schedule-empty?
50             schedule-segments
51             schedule-soonest-time
52
53             schedule-segments-split schedule-extract-until!
54             add-segments-contents-to-queue!
55
56             %8sync
57
58             <run-request>
59             make-run-request run-request?
60             run-request-proc run-request-when
61
62             <port-request>
63             make-port-request port-request port-request?
64             port-request-port
65             port-request-read port-request-write port-request-except
66
67             <port-remove-request>
68             make-port-remove-request port-remove-request port-remove-request?
69             port-remove-request-port
70
71             run-it wrap wrap-apply run run-at run-delay
72
73             %run %run-at %run-delay %port-request 
74             %8sync-run %8sync-run-at %8sync-run-delay %8sync-port
75             
76             catch-8sync catch-%8sync
77
78             ;; used for introspecting the error, but a method for making
79             ;; is not exposed
80             wrapped-exception?
81             wrapped-exception-key wrapped-exception-args
82             wrapped-exception-stacks
83
84             print-error-and-continue
85
86             stop-on-nothing-to-do
87
88             %current-agenda
89             start-agenda agenda-run-once))
90
91 ;; @@: Using immutable agendas here, so wouldn't it make sense to
92 ;;   replace this queue stuff with using pfds based immutable queues?
93
94 \f
95 ;;; Agenda definition
96 ;;; =================
97
98 ;;; The agenda consists of:
99 ;;;  - a queue of immediate items to handle
100 ;;;  - sheduled future events to be added to a future queue
101 ;;;  - a tag by which running processes can escape for some asynchronous
102 ;;;    operation (from which they can be returned later)
103 ;;;  - a mapping of ports to various handler procedures
104 ;;;
105 ;;; The goal, eventually, is for this all to be immutable and functional.
106 ;;; However, we aren't there yet.  Some tricky things:
107 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
108 ;;;  - Need to use immutable queues (ijp's pfds library?)
109 ;;;  - Modeling reading from ports as something repeatable,
110 ;;;    and with reasonable separation from functional components?
111
112 (define-immutable-record-type <agenda>
113   (make-agenda-intern queue prompt-tag
114                       read-port-map write-port-map except-port-map
115                       schedule time catch-handler pre-unwind-handler)
116   agenda?
117   (queue agenda-queue)
118   (prompt-tag agenda-prompt-tag)
119   (read-port-map agenda-read-port-map)
120   (write-port-map agenda-write-port-map)
121   (except-port-map agenda-except-port-map)
122   (schedule agenda-schedule)
123   (time agenda-time)
124   (catch-handler agenda-catch-handler)
125   (pre-unwind-handler agenda-pre-unwind-handler))
126
127 (define (make-async-prompt-tag)
128   "Make an async prompt tag for an agenda.
129
130 Generally done automatically for the user through (make-agenda)."
131   (make-prompt-tag "prompt"))
132
133 (define* (make-agenda #:key
134                       (queue (make-q))
135                       (prompt (make-prompt-tag))
136                       (read-port-map (make-hash-table))
137                       (write-port-map (make-hash-table))
138                       (except-port-map (make-hash-table))
139                       (schedule (make-schedule))
140                       (time (gettimeofday))
141                       (catch-handler #f)
142                       (pre-unwind-handler #f))
143   ;; TODO: document arguments
144   "Make a fresh agenda."
145   (make-agenda-intern queue prompt
146                       read-port-map write-port-map except-port-map
147                       schedule time
148                       catch-handler pre-unwind-handler))
149
150 (define (current-agenda-prompt)
151   "Get the prompt for the current agenda; signal an error if there isn't one."
152   (let ((current-agenda (%current-agenda)))
153     (if (not current-agenda)
154         (throw
155          'no-current-agenda
156          "Can't get current agenda prompt if there's no current agenda!")
157         (agenda-prompt-tag current-agenda))))
158
159 ;; helper for making queues for an agenda
160 (define (list->q lst)
161   "Makes a queue composed of LST items"
162   (let ((q (make-q)))
163     (for-each
164      (lambda (x)
165        (enq! q x))
166      lst)
167     q))
168
169 (define (make-q* . args)
170   "Makes a queue and populates it with this invocation's ARGS"
171   (list->q args))
172
173 \f
174 ;;; Schedule
175 ;;; ========
176
177 ;;; This is where we handle timed events for the future
178
179 ;; This section totally borrows from the ideas in SICP
180 ;; <3 <3 <3
181
182 ;; NOTE: time is a cons of (seconds . microseconds)
183
184 (define-record-type <time-segment>
185   (make-time-segment-intern time queue)
186   time-segment?
187   (time time-segment-time)
188   (queue time-segment-queue))
189
190 ;; @@: This seems to be the same as srfi-18's seconds->time procedure?
191 ;;   Maybe double check and switch to that?  (Thanks amz3!)
192
193 (define (time-from-float-or-fraction time)
194   "Produce a (sec . usec) pair from TIME, a float or fraction"
195   (let* ((mixed-whole (floor time))
196          (mixed-rest (- time mixed-whole))  ; float or fraction component
197          (sec mixed-whole)
198          (usec (floor (* 1000000 mixed-rest))))
199     (cons (inexact->exact sec) (inexact->exact usec))))
200
201 (define (time-segment-right-format time)
202   "Ensure TIME is in the right format.
203
204 The right format means (second . microsecond).
205 If an integer, will convert appropriately."
206   ;; TODO: add floating point / rational number support.
207   (match time
208     ;; time is already a cons of second and microsecnd
209     (((? integer? s) . (? integer? u)) time)
210     ;; time was just an integer (just the second)
211     ((? integer? _) (cons time 0))
212     ((or (? rational? _) (? inexact? _))
213      (time-from-float-or-fraction time))
214     (_ (throw 'invalid-time "Invalid time" time))))
215
216 (define* (make-time-segment time #:optional (queue (make-q)))
217   "Make a time segment of TIME and QUEUE
218
219 No automatic conversion is done, so you might have to
220 run (time-segment-right-format) first."
221   (make-time-segment-intern time queue))
222
223 (define (time< time1 time2)
224   "Check if TIME1 is less than TIME2"
225   (cond ((< (car time1)
226             (car time2))
227          #t)
228         ((and (= (car time1)
229                  (car time2))
230               (< (cdr time1)
231                  (cdr time2)))
232          #t)
233         (else #f)))
234
235 (define (time= time1 time2)
236   "Check whether TIME1 and TIME2 are equivalent"
237   (and (= (car time1) (car time2))
238        (= (cdr time1) (cdr time2))))
239
240 (define (time<= time1 time2)
241   "Check if TIME1 is less than or equal to TIME2"
242   (or (time< time1 time2)
243       (time= time1 time2)))
244
245
246 (define-record-type <time-delta>
247   (make-time-delta-intern sec usec)
248   time-delta?
249   (sec time-delta-sec)
250   (usec time-delta-usec))
251
252 (define* (make-time-delta time)
253   "Make a <time-delta> of SEC seconds and USEC microseconds.
254
255 This is used primarily so the agenda can recognize RUN-REQUEST objects
256 which are meant to delay computation"
257   (match (time-segment-right-format time)
258     ((sec . usec)
259      (make-time-delta-intern sec usec))))
260
261 (define tdelta make-time-delta)
262
263 (define (time-carry-correct time)
264   "Corrects/handles time microsecond carry.
265 Will produce (0 . 0) instead of a negative number, if needed."
266   (cond ((>= (cdr time) 1000000)
267          (cons
268           (+ (car time) 1)
269           (- (cdr time) 1000000)))
270         ((< (cdr time) 0)
271          (if (= (car time) 0)
272              '(0 0)
273              (cons
274               (- (car time) 1)
275               (+ (cdr time) 1000000))))
276         (else time)))
277
278 (define (time-delta+ time time-delta)
279   "Increment a TIME by the value of TIME-DELTA"
280   (time-carry-correct
281    (cons (+ (car time) (time-delta-sec time-delta))
282          (+ (cdr time) (time-delta-usec time-delta)))))
283
284 (define (time-minus time1 time2)
285   "Subtract TIME2 from TIME1"
286   (time-carry-correct
287    (cons (- (car time1) (car time2))
288          (- (cdr time1) (cdr time2)))))
289
290 (define (time-plus time1 time2)
291   "Add TIME1 and TIME2"
292   (time-carry-correct
293    (cons (+ (car time1) (car time2))
294          (+ (cdr time1) (cdr time2)))))
295
296
297 (define-record-type <schedule>
298   (make-schedule-intern segments)
299   schedule?
300   (segments schedule-segments set-schedule-segments!))
301
302 (define* (make-schedule #:optional segments)
303   "Make a schedule, optionally pre-composed of SEGMENTS"
304   (make-schedule-intern (or segments '())))
305
306 (define (schedule-soonest-time schedule)
307   "Return a cons of (sec . usec) for next time segement, or #f if none"
308   (let ((segments (schedule-segments schedule)))
309     (if (eq? segments '())
310         #f
311         (time-segment-time (car segments)))))
312
313 ;; TODO: This code is reasonably easy to read but it
314 ;;   mutates AND is worst case of O(n) in both space and time :(
315 ;;   but at least it'll be reasonably easy to refactor to
316 ;;   a more functional setup?
317 (define (schedule-add! schedule time proc)
318   "Mutate SCHEDULE, adding PROC at an appropriate time segment for TIME"
319   (let ((time (time-segment-right-format time)))
320     (define (new-time-segment)
321       (let ((new-segment
322              (make-time-segment time)))
323         (enq! (time-segment-queue new-segment) proc)
324         new-segment))
325     (define (loop segments)
326       (define (segment-equals-time? segment)
327         (time= time (time-segment-time segment)))
328
329       (define (segment-more-than-time? segment)
330         (time< time (time-segment-time segment)))
331
332       ;; We could switch this out to be more mutate'y
333       ;; and avoid the O(n) of space... is that over-optimizing?
334       (match segments
335         ;; If we're at the end of the list, time to make a new
336         ;; segment...
337         ('() (cons (new-time-segment) '()))
338         ;; If the segment's time is exactly our time, good news
339         ;; everyone!  Let's append our stuff to its queue
340         (((? segment-equals-time? first) rest ...)
341          (enq! (time-segment-queue first) proc)
342          segments)
343         ;; If the first segment is more than our time,
344         ;; ours belongs before this one, so add it and
345         ;; start consing our way back
346         (((? segment-more-than-time? first) rest ...)
347          (cons (new-time-segment) segments))
348         ;; Otherwise, build up recursive result
349         ((first rest ... )
350          (cons first (loop rest)))))
351     (set-schedule-segments!
352      schedule
353      (loop (schedule-segments schedule)))))
354
355 (define (schedule-empty? schedule)
356   "Check if the SCHEDULE is currently empty"
357   (eq? (schedule-segments schedule) '()))
358
359 (define (schedule-segments-split schedule time)
360   "Does a multiple value return of time segments before/at and after TIME"
361   (let ((time (time-segment-right-format time)))
362     (define (segment-is-now? segment)
363       (time= (time-segment-time segment) time))
364     (define (segment-is-before-now? segment)
365       (time< (time-segment-time segment) time))
366
367     (let loop ((segments-before '())
368                (segments-left (schedule-segments schedule)))
369       (match segments-left
370         ;; end of the line, return
371         ('()
372          (values (reverse segments-before) '()))
373
374         ;; It's right now, so time to stop, but include this one in before
375         ;; but otherwise return
376         (((? segment-is-now? first) rest ...)
377          (values (reverse (cons first segments-before)) rest))
378
379         ;; This is prior or at now, so add it and keep going
380         (((? segment-is-before-now? first) rest ...)
381          (loop (cons first segments-before) rest))
382
383         ;; Otherwise it's past now, just return what we have
384         (segments-after
385          (values segments-before segments-after))))))
386
387 (define (schedule-extract-until! schedule time)
388   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
389   (receive (segments-before segments-after)
390       (schedule-segments-split schedule time)
391     (set-schedule-segments! schedule segments-after)
392     segments-before))
393
394 (define (add-segments-contents-to-queue! segments queue)
395   (for-each
396    (lambda (segment)
397      (let ((seg-queue (time-segment-queue segment)))
398        (while (not (q-empty? seg-queue))
399          (enq! queue (deq! seg-queue)))))
400    segments))
401
402
403 \f
404 ;;; Request to run stuff
405 ;;; ====================
406
407 (define-record-type <run-request>
408   (make-run-request proc when)
409   run-request?
410   (proc run-request-proc)
411   (when run-request-when))
412
413 (define* (run-it proc #:optional when)
414   "Make a request to run PROC (possibly at WHEN)"
415   (make-run-request proc when))
416
417 (define-syntax-rule (wrap body ...)
418   "Wrap contents in a procedure"
419   (lambda ()
420     body ...))
421
422 (define-syntax-rule (wrap-apply body)
423   "Wrap possibly multi-value function in a procedure, applies all arguments"
424   (lambda args
425     (apply body args)))
426
427
428 ;; @@: Do we really want `body ...' here?
429 ;;   what about just `body'?
430 (define-syntax-rule (run body ...)
431   "Run everything in BODY but wrap in a convenient procedure"
432   (make-run-request (wrap body ...) #f))
433
434 (define-syntax-rule (run-at body ... when)
435   "Run BODY at WHEN"
436   (make-run-request (wrap body ...) when))
437
438 ;; @@: Is it okay to overload the term "delay" like this?
439 ;;   Would `run-in' be better?
440 (define-syntax-rule (run-delay body ... delay-time)
441   "Run BODY at DELAY-TIME time from now"
442   (make-run-request (wrap body ...) (tdelta delay-time)))
443
444
445 ;; A request to set up a port with at least one of read, write, except
446 ;; handling processes
447
448 (define-record-type <port-request>
449   (make-port-request-intern port read write except)
450   port-request?
451   (port port-request-port)
452   (read port-request-read)
453   (write port-request-write)
454   (except port-request-except))
455
456 (define* (make-port-request port #:key read write except)
457   (if (not (or read write except))
458       (throw 'no-port-handler-given "No port handler given.\n"))
459   (make-port-request-intern port read write except))
460
461 (define port-request make-port-request)
462
463 (define-record-type <port-remove-request>
464   (make-port-remove-request port)
465   port-remove-request?
466   (port port-remove-request-port))
467
468 (define port-remove-request make-port-remove-request)
469
470
471 \f
472 ;;; Asynchronous escape to run things
473 ;;; =================================
474
475 (define-syntax-rule (%8sync async-request)
476   "Run BODY asynchronously at a prompt, passing args to make-future.
477
478 Runs things asynchronously (8synchronously?)"
479   (propagate-%async-exceptions
480    (abort-to-prompt (current-agenda-prompt)
481                     async-request)))
482
483 ;; Async port request and run-request meta-requests
484 (define (make-async-request proc)
485   "Wrap PROC in an async-request
486
487 The purpose of this is to make sure that users don't accidentally
488 return the wrong thing via (%8sync) and trip themselves up."
489   (cons '*async-request* proc))
490
491 (define (setup-async-request resume-kont async-request)
492   "Complete an async request for agenda-run-once's continuation handling"
493   (match async-request
494     (('*async-request* . async-setup-proc)
495      (async-setup-proc resume-kont))
496     ;; TODO: deliver more helpful errors depending on what the user
497     ;;   returned
498     (_ (throw 'invalid-async-request
499               "Invalid request passed back via an (%8sync) procedure."
500               async-request))))
501
502 (define-record-type <wrapped-exception>
503   (make-wrapped-exception key args stacks)
504   wrapped-exception?
505   (key wrapped-exception-key)
506   (args wrapped-exception-args)
507   (stacks wrapped-exception-stacks))
508
509 (define-syntax-rule (propagate-%async-exceptions body)
510   (let ((body-result body))
511     (if (wrapped-exception? body-result)
512         (throw '8sync-caught-error
513                (wrapped-exception-key body-result)
514                (wrapped-exception-args body-result)
515                (wrapped-exception-stacks body-result))
516         body-result)))
517
518 (define-syntax-rule (%run body ...)
519   (%run-at body ... #f))
520
521 (define-syntax-rule (%run-at body ... when)
522   ;; Send an asynchronous request to apply a continuation to the
523   ;; following function, then handle that as a request to the agenda
524   (make-async-request
525    (lambda (kont)
526      ;; We're making a run request
527      (make-run-request
528       ;; Wrapping the following execution to run...
529       (wrap
530        ;; Once we get the result from the inner part, we'll resume
531        ;; this continuation, but first
532        ;; @@: Is this running immediately, or queueing the result
533        ;;   after evaluation for the next agenda tick?  It looks
534        ;;   like evaluating immediately.  Is that what we want?
535        (kont
536         ;; Any unhandled errors are caught
537         (let ((exception-stack #f))
538           (catch #t
539             ;; Run the actual code the user requested
540             (lambda ()
541               body ...)
542             ;; If something bad happened and we didn't catch it,
543             ;; we'll wrap it up in such a way that the continuation
544             ;; can address it
545             (lambda (key . args)
546               (cond
547                ((eq? key '8sync-caught-error)
548                 (match args
549                   ((orig-key orig-args orig-stacks)
550                    (make-wrapped-exception
551                     orig-key orig-args
552                     (cons exception-stack orig-stacks)))))
553                (else
554                 (make-wrapped-exception key args
555                                         (list exception-stack)))))
556             (lambda _
557               (set! exception-stack (make-stack #t 1 0)))))))
558       when))))
559
560 (define-syntax-rule (%run-delay body ... delay-time)
561   (%run-at body ... (tdelta delay-time)))
562
563 (define-syntax-rule (%port-request port port-request-args ...)
564   (make-async-request
565    (lambda (kont)
566      (list (make-port-request port port-request-args ...)
567            (make-run-request kont)))))
568
569 (define-syntax-rule (%port-remove-request port)
570   (make-async-request
571    (lambda (kont)
572      (list (make-port-remove-request port)
573            (make-run-request kont)))))
574
575
576 ;; Sugar
577 (define-syntax-rule (%8sync-run rest ...)
578   "Sugar for (%8sync (%run ...))"
579   (%8sync (%run rest ...)))
580
581 (define-syntax-rule (%8sync-run-at rest ...)
582   "Sugar for (%8sync (%run-at ...))"
583   (%8sync (%run-at rest ...)))
584
585 (define-syntax-rule (%8sync-run-delay rest ...)
586   "Sugar for (%8sync (%run-delay ...))"
587   (%8sync (%run-delay rest ...)))
588
589 (define-syntax-rule (%8sync-port rest ...)
590   "Sugar for (%8sync (%port-request ...))"
591   (%8sync (%port-request rest ...)))
592
593
594 ;; TODO: Write (%run-immediately)
595
596 ;; TODO
597 (define-syntax-rule (%run-with-return return body ...)
598   (make-async-request
599    (lambda (kont)
600      (let ((return kont))
601        (lambda ()
602          body ...)))))
603
604 (define-syntax-rule (catch-8sync exp (handler-key handler) ...)
605   (catch '8sync-caught-error
606     (lambda ()
607       exp)
608     (lambda (_ orig-key orig-args orig-stacks)
609       (cond
610        ((or (eq? handler-key #t)
611             (eq? orig-key handler-key))
612         (apply handler orig-stacks orig-args)) ...
613        (else (raise '8sync-caught-error
614                     orig-key orig-args orig-stacks))))))
615
616 ;; Alias...?
617 (define-syntax-rule (catch-%8sync rest ...)
618   (catch-8sync rest ...))
619
620
621 \f
622 ;;; Execution of agenda, and current agenda
623 ;;; =======================================
624
625 (define %current-agenda (make-parameter #f))
626
627 (define (update-agenda-from-select! agenda)
628   "Potentially (select) on ports specified in agenda, adding items to queue.
629
630 Also handles sleeping when all we have to do is wait on the schedule."
631   (define (hash-keys hash)
632     (hash-map->list (lambda (k v) k) hash))
633   (define (get-wait-time)
634     ;; TODO: we need to figure this out based on whether there's anything
635     ;;   in the queue, and if not, how long till the next scheduled item
636     (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
637       (cond 
638        ((not (q-empty? (agenda-queue agenda)))
639         (cons 0 0))
640        (soonest-time    ; ie, the agenda is non-empty
641         (let* ((current-time (agenda-time agenda)))
642           (if (time<= soonest-time current-time)
643               ;; Well there's something due so let's select
644               ;; (this avoids a (possible?) race condition chance)
645               (cons 0 0)
646               (time-minus soonest-time current-time))))
647        (else
648         (cons #f #f)))))
649   (define (do-select)
650     ;; TODO: support usecond wait time too
651     (match (get-wait-time)
652       ((sec . usec)
653        (catch 'system-error
654          (lambda ()
655            (select (hash-keys (agenda-read-port-map agenda))
656                    (hash-keys (agenda-write-port-map agenda))
657                    (hash-keys (agenda-except-port-map agenda))
658                    sec usec))
659          (lambda (key . rest-args)
660            (match rest-args
661              ((_ _ _ (EINTR))
662               '(() () ()))
663              (_ (error "Unhandled error in select!" key rest-args))))))))
664   (define (get-procs-to-run)
665     (define (ports->procs ports port-map)
666       (lambda (initial-procs)
667         (fold
668          (lambda (port prev)
669            (cons (lambda ()
670                    ((hash-ref port-map port) port))
671                  prev))
672          initial-procs
673          ports)))
674     (match (do-select)
675       ((read-ports write-ports except-ports)
676        ;; @@: Come on, we can do better than append ;P
677        ((compose (ports->procs
678                   read-ports
679                   (agenda-read-port-map agenda))
680                  (ports->procs
681                   write-ports
682                   (agenda-write-port-map agenda))
683                  (ports->procs
684                   except-ports
685                   (agenda-except-port-map agenda)))
686         '()))))
687   (define (update-agenda)
688     (let ((procs-to-run (get-procs-to-run))
689           (q (agenda-queue agenda)))
690       (for-each
691        (lambda (proc)
692          (enq! q proc))
693        procs-to-run))
694     agenda)
695   (define (ports-to-select?)
696     (define (has-items? selector)
697       ;; @@: O(n)
698       ;;    ... we could use hash-for-each and a continuation to jump
699       ;;    out with a #t at first indication of an item
700       (not (= (hash-count (const #t)
701                           (selector agenda))
702               0)))
703     (or (has-items? agenda-read-port-map)
704         (has-items? agenda-write-port-map)
705         (has-items? agenda-except-port-map)))
706
707   (if (or (ports-to-select?)
708           ;; select doubles as sleep...
709           (not (schedule-empty? (agenda-schedule agenda)))) 
710       (update-agenda)
711       agenda))
712
713 (define (agenda-handle-port-request! agenda port-request)
714   "Update an agenda for a port-request"
715   (define (handle-selector request-selector port-map-selector)
716     (if (request-selector port-request)
717         ;; @@: Should we remove if #f?
718         (hash-set! (port-map-selector agenda)
719                    (port-request-port port-request)
720                    (request-selector port-request))))
721   (handle-selector port-request-read agenda-read-port-map)
722   (handle-selector port-request-write agenda-write-port-map)
723   (handle-selector port-request-except agenda-except-port-map))
724
725
726 (define (agenda-handle-port-remove-request! agenda port-remove-request)
727   "Update an agenda for a port-remove-request"
728   (let ((port (port-remove-request-port port-remove-request)))
729     (hash-remove! (agenda-read-port-map agenda) port)
730     (hash-remove! (agenda-write-port-map agenda) port)
731     (hash-remove! (agenda-except-port-map agenda) port)))
732
733
734 (define (stop-on-nothing-to-do agenda)
735   (and (q-empty? (agenda-queue agenda))
736        (schedule-empty? (agenda-schedule agenda))
737        (= 0 (hash-count (const #t) (agenda-read-port-map agenda)))
738        (= 0 (hash-count (const #t) (agenda-write-port-map agenda)))
739        (= 0 (hash-count (const #t) (agenda-except-port-map agenda)))))
740
741
742 (define* (start-agenda agenda
743                        #:key
744                        ;; @@: Should we make stop-on-nothing-to-do
745                        ;;   the default stop-condition?
746                        stop-condition
747                        (get-time gettimeofday)
748                        (handle-ports update-agenda-from-select!))
749   ;; TODO: Document fields
750   "Start up the AGENDA"
751   (let loop ((agenda agenda))
752     (let ((agenda   
753            ;; @@: Hm, maybe here would be a great place to handle
754            ;;   select'ing on ports.
755            ;;   We could compose over agenda-run-once and agenda-read-ports
756            (agenda-run-once agenda)))
757       (if (and stop-condition (stop-condition agenda))
758           'done
759           (let* ((agenda
760                   ;; We have to update the time after ports handled, too
761                   ;; because it may have changed after a select
762                   (set-field
763                    (handle-ports
764                     ;; Adjust the agenda's time just in time
765                     ;; We do this here rather than in agenda-run-once to make
766                     ;; agenda-run-once's behavior fairly predictable
767                     (set-field agenda (agenda-time) (get-time)))
768                    (agenda-time) (get-time))))
769             ;; Update the agenda's current queue based on
770             ;; currently applicable time segments
771             (add-segments-contents-to-queue!
772              (schedule-extract-until! (agenda-schedule agenda) (agenda-time agenda))
773              (agenda-queue agenda))
774             (loop agenda))))))
775
776
777 (define (print-error-and-continue key . args)
778   "Frequently used as pre-unwind-handler for agenda"
779   (cond
780    ((eq? key '8sync-caught-error)
781     (match args
782       ((orig-key orig-args stacks)
783        (display "\n*** Caught async exception. ***\n")
784        (format (current-error-port)
785                "* Original key '~s and arguments: ~s *\n"
786                orig-key orig-args)
787        (display "* Caught stacks below (ending with original) *\n\n")
788        (for-each
789         (lambda (s)
790           (display-backtrace s (current-error-port))
791           (newline (current-error-port)))
792         stacks))))
793    (else
794     (format (current-error-port)
795             "\n*** Caught exception with key '~s and arguments: ~s ***\n"
796             key args)
797     (display-backtrace (make-stack #t 1 0)
798                        (current-error-port))
799     (newline (current-error-port)))))
800
801 (define-syntax-rule (maybe-catch-all (catch-handler pre-unwind-handler)
802                                      body ...)
803   (if (or catch-handler pre-unwind-handler)
804       (catch
805         #t
806         (lambda ()
807           body ...)
808         (or catch-handler (lambda _ #f))
809         (or pre-unwind-handler (lambda _ #f)))
810       (begin body ...)))
811
812 (define (agenda-run-once agenda)
813   "Run once through the agenda, and produce a new agenda
814 based on the results"
815   (define (call-proc proc)
816     (call-with-prompt
817      (agenda-prompt-tag agenda)
818      (lambda ()
819        (parameterize ((%current-agenda agenda))
820          (maybe-catch-all
821           ((agenda-catch-handler agenda)
822            (agenda-pre-unwind-handler agenda))
823           (proc))))
824      (lambda (kont async-request)
825        (setup-async-request kont async-request))))
826
827   (let ((queue (agenda-queue agenda))
828         (next-queue (make-q)))
829     (while (not (q-empty? queue))
830       (let* ((proc (q-pop! queue))
831              (proc-result (call-proc proc))
832              (enqueue
833               (lambda (run-request)
834                 (define (schedule-at! time proc)
835                   (schedule-add! (agenda-schedule agenda) time proc))
836                 (let ((request-time (run-request-when run-request)))
837                   (match request-time
838                     ((? time-delta? time-delta)
839                      (let ((time (time-delta+ (agenda-time agenda)
840                                               time-delta)))
841                        (schedule-at! time (run-request-proc run-request))))
842                     ((? integer? sec)
843                      (let ((time (cons sec 0)))
844                        (schedule-at! time (run-request-proc run-request))))
845                     (((? integer? sec) . (? integer? usec))
846                      (schedule-at! request-time (run-request-proc run-request)))
847                     (#f
848                      (enq! next-queue (run-request-proc run-request))))))))
849         (define (handle-individual result)
850           ;; @@: Could maybe optimize by switching to an explicit cond...
851           (match result
852             ((? run-request? new-proc)
853              (enqueue new-proc))
854             ((? port-request? port-request)
855              (agenda-handle-port-request! agenda port-request))
856             ((? port-remove-request? port-remove-request)
857              (agenda-handle-port-remove-request! agenda port-remove-request))
858             ;; do nothing
859             (_ #f)))
860         ;; @@: We might support delay-wrapped procedures here
861         (match proc-result
862           ((results ...)
863            (for-each handle-individual results))
864           (one-result (handle-individual one-result)))))
865     ;; TODO: Alternately, we could return the next-queue
866     ;;   along with changes to be added to the schedule here?
867     ;; Return new agenda, with next queue set
868     (set-field agenda (agenda-queue) next-queue)))