4a126a4e1fddbcf140748f62b7b9861f7d5e03e2
[8sync.git] / 8sync / agenda.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright (C) 2015, 2016 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync agenda)
20   #:use-module (srfi srfi-1)
21   #:use-module (srfi srfi-9)
22   #:use-module (ice-9 q)
23   #:use-module (ice-9 match)
24   #:use-module (ice-9 receive)
25   #:use-module (ice-9 suspendable-ports)
26   #:export (<agenda>
27             make-agenda agenda?
28             agenda-queue agenda-prompt-tag
29             agenda-read-port-map agenda-write-port-map
30             agenda-schedule
31             
32             make-async-prompt-tag
33
34             list->q make-q*
35
36             <schedule>
37             make-schedule schedule?
38             schedule-add! schedule-empty?
39             schedule-segments
40             schedule-soonest-time
41
42             schedule-segments-split schedule-extract-until!
43             add-segments-contents-to-queue!
44
45             <run-request>
46             make-run-request run-request?
47             run-request-proc run-request-when
48
49             run-it wrap wrap-apply run run-at run-delay
50
51             8sync
52             8sleep 8yield
53             
54             ;; used for introspecting the error, but a method for making
55             ;; is not exposed
56             wrapped-exception?
57             wrapped-exception-key wrapped-exception-args
58             wrapped-exception-stacks
59
60             print-error-and-continue
61
62             stop-on-nothing-to-do
63
64             %current-agenda-prompt
65             run-agenda agenda-run-once!))
66
67 \f
68 ;;; Agenda definition
69 ;;; =================
70
71 ;;; The agenda consists of:
72 ;;;  - a queue of immediate items to handle
73 ;;;  - sheduled future events to be added to a future queue
74 ;;;  - a tag by which running processes can escape for some asynchronous
75 ;;;    operation (from which they can be returned later)
76 ;;;  - a mapping of ports to various handler procedures
77 ;;;
78 ;;; @@: Is this next part deprecated?
79 ;;; The goal, maybe eventually, is for this all to be immutable and functional.
80 ;;; However, we aren't there yet.  Some tricky things:
81 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
82 ;;;  - Need to use immutable queues (ijp's pfds library?)
83 ;;;  - Modeling reading from ports as something repeatable,
84 ;;;    and with reasonable separation from functional components?
85
86 ;; TODO: Tear out the immutable agenda aspect until we're actually ready
87 ;;   to use it.
88 (define-record-type <agenda>
89   (make-agenda-intern queue prompt-tag
90                       read-port-map write-port-map
91                       schedule catch-handler pre-unwind-handler)
92   agenda?
93   (queue agenda-queue set-agenda-queue!)
94   (prompt-tag agenda-prompt-tag)
95   (read-port-map agenda-read-port-map)
96   (write-port-map agenda-write-port-map)
97   (schedule agenda-schedule)
98   (catch-handler agenda-catch-handler)
99   (pre-unwind-handler agenda-pre-unwind-handler))
100
101 (define (make-async-prompt-tag)
102   "Make an async prompt tag for an agenda.
103
104 Generally done automatically for the user through (make-agenda)."
105   (make-prompt-tag "prompt"))
106
107 (define* (make-agenda #:key
108                       (queue (make-q))
109                       (prompt (make-prompt-tag))
110                       (read-port-map (make-hash-table))
111                       (write-port-map (make-hash-table))
112                       (schedule (make-schedule))
113                       (catch-handler #f)
114                       (pre-unwind-handler print-error-and-continue))
115   ;; TODO: document arguments
116   "Make a fresh agenda."
117   (make-agenda-intern queue prompt
118                       read-port-map write-port-map
119                       schedule catch-handler pre-unwind-handler))
120
121 ;; helper for making queues for an agenda
122 (define (list->q lst)
123   "Makes a queue composed of LST items"
124   (let ((q (make-q)))
125     (for-each
126      (lambda (x)
127        (enq! q x))
128      lst)
129     q))
130
131 (define (make-q* . args)
132   "Makes a queue and populates it with this invocation's ARGS"
133   (list->q args))
134
135 \f
136 ;;; Schedule
137 ;;; ========
138
139 ;;; This is where we handle timed events for the future
140
141 ;; This section totally borrows from the ideas in SICP
142 ;; <3 <3 <3
143
144 ;; NOTE: time is a cons of (seconds . microseconds)
145
146 (define-record-type <time-segment>
147   (make-time-segment-intern time queue)
148   time-segment?
149   (time time-segment-time)
150   (queue time-segment-queue))
151
152 (define* (make-time-segment time #:optional (queue (make-q)))
153   "Make a time segment of TIME and QUEUE
154
155 No automatic conversion is done, so you might have to
156 run (time-segment-right-format) first."
157   (make-time-segment-intern time queue))
158
159 (define (time< time1 time2)
160   "Check if TIME1 is less than TIME2"
161   (cond ((< (car time1)
162             (car time2))
163          #t)
164         ((and (= (car time1)
165                  (car time2))
166               (< (cdr time1)
167                  (cdr time2)))
168          #t)
169         (else #f)))
170
171 (define (time= time1 time2)
172   "Check whether TIME1 and TIME2 are equivalent"
173   (and (= (car time1) (car time2))
174        (= (cdr time1) (cdr time2))))
175
176 (define (time<= time1 time2)
177   "Check if TIME1 is less than or equal to TIME2"
178   (or (time< time1 time2)
179       (time= time1 time2)))
180
181 ;; @@: Maybe we should use floor/ here?
182 (define (time-carry-correct time)
183   "Corrects/handles time microsecond carry.
184 Will produce (0 . 0) instead of a negative number, if needed."
185   (cond ((>= (cdr time) 1000000)
186          (cons
187           (+ (car time) 1)
188           (- (cdr time) 1000000)))
189         ((< (cdr time) 0)
190          (if (= (car time) 0)
191              '(0 0)
192              (cons
193               (- (car time) 1)
194               (+ (cdr time) 1000000))))
195         (else time)))
196
197 (define (time-minus time1 time2)
198   "Subtract TIME2 from TIME1"
199   (time-carry-correct
200    (cons (- (car time1) (car time2))
201          (- (cdr time1) (cdr time2)))))
202
203 ;; @@: Unused?
204 (define (time-plus time1 time2)
205   "Add TIME1 and TIME2"
206   (time-carry-correct
207    (cons (+ (car time1) (car time2))
208          (+ (cdr time1) (cdr time2)))))
209
210 (define-record-type <schedule>
211   (make-schedule-intern segments)
212   schedule?
213   (segments schedule-segments set-schedule-segments!))
214
215 (define* (make-schedule #:optional segments)
216   "Make a schedule, optionally pre-composed of SEGMENTS"
217   (make-schedule-intern (or segments '())))
218
219 (define (schedule-soonest-time schedule)
220   "Return a cons of (sec . usec) for next time segement, or #f if none"
221   (let ((segments (schedule-segments schedule)))
222     (if (eq? segments '())
223         #f
224         (time-segment-time (car segments)))))
225
226 ;; TODO: This code is reasonably easy to read but it
227 ;;   mutates AND is worst case of O(n) in both space and time :(
228 ;;   but at least it'll be reasonably easy to refactor to
229 ;;   a more functional setup?
230 (define (schedule-add! schedule time proc)
231   "Mutate SCHEDULE, adding PROC at an appropriate time segment for TIME"
232   (define (new-time-segment)
233     (let ((new-segment
234            (make-time-segment time)))
235       (enq! (time-segment-queue new-segment) proc)
236       new-segment))
237   (define (loop segments)
238     (define (segment-equals-time? segment)
239       (time= time (time-segment-time segment)))
240
241     (define (segment-more-than-time? segment)
242       (time< time (time-segment-time segment)))
243
244     ;; We could switch this out to be more mutate'y
245     ;; and avoid the O(n) of space... is that over-optimizing?
246     (match segments
247       ;; If we're at the end of the list, time to make a new
248       ;; segment...
249       ('() (cons (new-time-segment) '()))
250       ;; If the segment's time is exactly our time, good news
251       ;; everyone!  Let's append our stuff to its queue
252       (((? segment-equals-time? first) rest ...)
253        (enq! (time-segment-queue first) proc)
254        segments)
255       ;; If the first segment is more than our time,
256       ;; ours belongs before this one, so add it and
257       ;; start consing our way back
258       (((? segment-more-than-time? first) rest ...)
259        (cons (new-time-segment) segments))
260       ;; Otherwise, build up recursive result
261       ((first rest ... )
262        (cons first (loop rest)))))
263   (set-schedule-segments!
264    schedule
265    (loop (schedule-segments schedule))))
266
267 (define (schedule-empty? schedule)
268   "Check if the SCHEDULE is currently empty"
269   (eq? (schedule-segments schedule) '()))
270
271 (define (schedule-segments-split schedule time)
272   "Does a multiple value return of time segments before/at and after TIME"
273   (define (segment-is-now? segment)
274     (time= (time-segment-time segment) time))
275   (define (segment-is-before-now? segment)
276     (time< (time-segment-time segment) time))
277
278   (let loop ((segments-before '())
279              (segments-left (schedule-segments schedule)))
280     (match segments-left
281       ;; end of the line, return
282       ('()
283        (values (reverse segments-before) '()))
284
285       ;; It's right now, so time to stop, but include this one in before
286       ;; but otherwise return
287       (((? segment-is-now? first) rest ...)
288        (values (reverse (cons first segments-before)) rest))
289
290       ;; This is prior or at now, so add it and keep going
291       (((? segment-is-before-now? first) rest ...)
292        (loop (cons first segments-before) rest))
293
294       ;; Otherwise it's past now, just return what we have
295       (segments-after
296        (values segments-before segments-after)))))
297
298 (define (schedule-extract-until! schedule time)
299   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
300   (receive (segments-before segments-after)
301       (schedule-segments-split schedule time)
302     (set-schedule-segments! schedule segments-after)
303     segments-before))
304
305 (define (add-segments-contents-to-queue! segments queue)
306   (for-each
307    (lambda (segment)
308      (let ((seg-queue (time-segment-queue segment)))
309        (while (not (q-empty? seg-queue))
310          (enq! queue (deq! seg-queue)))))
311    segments))
312
313
314 \f
315 ;;; Request to run stuff
316 ;;; ====================
317
318 (define-record-type <run-request>
319   (make-run-request proc when)
320   run-request?
321   (proc run-request-proc)
322   (when run-request-when))
323
324 (define* (run-it proc #:optional when)
325   "Make a request to run PROC (possibly at WHEN)"
326   (make-run-request proc when))
327
328 (define-syntax-rule (wrap body ...)
329   "Wrap contents in a procedure"
330   (lambda ()
331     body ...))
332
333 (define-syntax-rule (wrap-apply body)
334   "Wrap possibly multi-value function in a procedure, applies all arguments"
335   (lambda args
336     (apply body args)))
337
338
339 ;; @@: Do we really want `body ...' here?
340 ;;   what about just `body'?
341 (define-syntax-rule (run body ...)
342   "Run everything in BODY but wrap in a convenient procedure"
343   (make-run-request (wrap body ...) #f))
344
345 (define-syntax-rule (run-at body ... when)
346   "Run BODY at WHEN"
347   (make-run-request (wrap body ...) when))
348
349 (define-syntax-rule (run-delay body ... delay-time)
350   "Run BODY at DELAY-TIME time from now"
351   (make-run-request (wrap body ...) (delayed-time delay-time)))
352
353 ;;; Procedures that are delimited continuations being resumed are
354 ;;; wrapped in a <kontinue>.  This is so that we don't accidentally
355 ;;; wrap in another catch statement every time we resume them, which
356 ;;; would be a drag.
357
358 (define-record-type <kontinue>
359   (kontinue kont)
360   kontinue?
361   (kont kontinue-kont))
362
363
364 \f
365 ;;; Asynchronous escape to run things
366 ;;; =================================
367
368 (define-syntax-rule (8sync-abort-to-prompt async-request)
369   (abort-to-prompt (%current-agenda-prompt)
370                    async-request))
371
372 ;; Async port request and run-request meta-requests
373 (define (make-async-request proc)
374   "Wrap PROC in an async-request
375
376 The purpose of this is to make sure that users don't accidentally
377 return the wrong thing via (8sync) and trip themselves up."
378   (cons '*async-request* proc))
379
380 (define (setup-async-request resume-kont async-request)
381   "Complete an async request for agenda-run-once's continuation handling"
382   (match async-request
383     (('*async-request* . async-setup-proc)
384      (async-setup-proc resume-kont))
385     ;; TODO: deliver more helpful errors depending on what the user
386     ;;   returned
387     (_ (throw 'invalid-async-request
388               "Invalid request passed back via an (8sync) procedure."
389               async-request))))
390
391 (define-syntax-rule (8sync body ...)
392   "Run body asynchronously but ignore its result...
393 forge ahead in our current function!"
394   (8sync-abort-to-prompt
395    (make-async-request
396     (lambda (kont)
397       (list (make-run-request
398              (kontinue kont) #f)
399             (make-run-request (lambda () body ...) #f))))))
400
401 (define (delayed-time in-secs)
402   "Calculate a cons of '(sec . usec) of IN-SECS from current time"
403   (define cur-time (gettimeofday))
404   (define cur-secs (car cur-time))
405   (define cur-usecs (cdr cur-time))
406   (define (convert-non-integer)
407     (define next-time-in-usecs
408       (+ (* (+ in-secs cur-secs) ; add our seconds to current seconds
409             1000000)             ; and turn into usecs
410          cur-usecs))             ; then add in current usecs
411     ;; convert into sec / usec pair
412     (receive (secs usecs)
413         (floor/ next-time-in-usecs 1000000)
414       (cons secs (floor usecs))))
415   (define (convert-integer)
416     (cons (+ in-secs cur-secs) cur-usecs))
417   (if (integer? in-secs)
418       (convert-integer)
419       (convert-non-integer)))
420
421 ;; TODO: Rewrite when we move to this being just `sleep'.
422 (define (8sleep secs)
423   "Like sleep, but asynchronous."
424   (8sync-abort-to-prompt
425    (make-async-request
426     (lambda (kont)
427       (make-run-request (kontinue kont) (delayed-time secs))))))
428
429 ;; Voluntarily yield execution
430 (define (8yield)
431   "Voluntarily yield execution to the scheduler."
432   (8sync-abort-to-prompt
433    (make-async-request
434     (lambda (kont)
435       (make-run-request (kontinue kont) #f)))))
436
437 \f
438 ;;; Execution of agenda, and current agenda
439 ;;; =======================================
440
441 (define %current-agenda-prompt (make-parameter #f))
442
443 (define (update-agenda-from-select! agenda)
444   "Potentially (select) on ports specified in agenda, adding items to queue.
445
446 Also handles sleeping when all we have to do is wait on the schedule."
447   (define (hash-keys hash)
448     (hash-map->list (lambda (k v) k) hash))
449   (define (get-wait-time)
450     ;; TODO: we need to figure this out based on whether there's anything
451     ;;   in the queue, and if not, how long till the next scheduled item
452     (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
453       (cond 
454        ((not (q-empty? (agenda-queue agenda)))
455         (cons 0 0))
456        (soonest-time    ; ie, the agenda is non-empty
457         (let* ((current-time (gettimeofday)))
458           (if (time<= soonest-time current-time)
459               ;; Well there's something due so let's select
460               ;; (this avoids a (possible?) race condition chance)
461               (cons 0 0)
462               (time-minus soonest-time current-time))))
463        (else
464         (cons #f #f)))))
465   (define (do-select)
466     ;; TODO: support usecond wait time too
467     (match (get-wait-time)
468       ((sec . usec)
469        (catch 'system-error
470          (lambda ()
471            (select (hash-keys (agenda-read-port-map agenda))
472                    (hash-keys (agenda-write-port-map agenda))
473                    '()
474                    sec usec))
475          (lambda (key . rest-args)
476            (match rest-args
477              ((_ _ _ (EINTR))
478               '(() () ()))
479              (_ (error "Unhandled error in select!" key rest-args))))))))
480   (define (get-procs-to-run)
481     (define (ports->procs ports port-map)
482       (lambda (initial-procs)
483         (fold
484          (lambda (port prev)
485            (define proc (hashq-ref port-map port))
486            ;; Now that we've selected on this port, it can be removed
487            (hashq-remove! port-map port)
488            (cons proc prev))
489          initial-procs
490          ports)))
491     (match (do-select)
492       ((read-ports write-ports except-ports) ; except-ports not used
493        ((compose (ports->procs
494                   read-ports
495                   (agenda-read-port-map agenda))
496                  (ports->procs
497                   write-ports
498                   (agenda-write-port-map agenda)))
499         '()))))
500   (define (update-agenda)
501     (let ((procs-to-run (get-procs-to-run))
502           (q (agenda-queue agenda)))
503       (for-each
504        (lambda (proc)
505          (enq! q proc))
506        procs-to-run))
507     agenda)
508   (define (ports-to-select?)
509     (define (has-items? selector)
510       ;; @@: O(n)
511       ;;    ... we could use hash-for-each and a continuation to jump
512       ;;    out with a #t at first indication of an item
513       (not (= (hash-count (const #t)
514                           (selector agenda))
515               0)))
516     (or (has-items? agenda-read-port-map)
517         (has-items? agenda-write-port-map)))
518
519   (if (or (ports-to-select?)
520           ;; select doubles as sleep...
521           (not (schedule-empty? (agenda-schedule agenda)))) 
522       (update-agenda)
523       agenda))
524
525 (define-record-type <read-request>
526   (make-read-request port proc)
527   read-request?
528   (port read-request-port)
529   (proc read-request-proc))
530
531 (define-record-type <write-request>
532   (make-write-request port proc)
533   write-request?
534   (port write-request-port)
535   (proc write-request-proc))
536
537 (define (agenda-handle-read-request! agenda read-request)
538   "Handle <read-request>, which is a request to add this port to the poll/select
539 on suspendable ports."
540   (hashq-set! (agenda-read-port-map agenda)
541               (read-request-port read-request)
542               (read-request-proc read-request)))
543
544 (define (agenda-handle-write-request! agenda write-request)
545   (hashq-set! (agenda-write-port-map agenda)
546               (write-request-port write-request)
547               (write-request-proc write-request)))
548
549 (define (stop-on-nothing-to-do agenda)
550   (and (q-empty? (agenda-queue agenda))
551        (schedule-empty? (agenda-schedule agenda))
552        (= 0 (hash-count (const #t) (agenda-read-port-map agenda)))
553        (= 0 (hash-count (const #t) (agenda-write-port-map agenda)))))
554
555
556 (define* (run-agenda agenda
557                      #:key (stop-condition stop-on-nothing-to-do)
558                      ;; For live hacking madness, etc
559                      (post-run-hook #f))
560   ;; TODO: Document fields
561   "Start up the AGENDA"
562   (install-suspendable-ports!)
563   (parameterize ((%current-agenda-prompt (agenda-prompt-tag agenda))
564                  (current-read-waiter wait-for-readable)
565                  (current-write-waiter wait-for-writable))
566     (while (not (stop-condition agenda))
567       (agenda-run-once! agenda)
568       (update-agenda-from-select! agenda)
569       (add-segments-contents-to-queue!
570        (schedule-extract-until! (agenda-schedule agenda) (gettimeofday))
571        (agenda-queue agenda))
572       (if post-run-hook
573           (post-run-hook agenda))))
574   'done)
575
576 (define (print-error-and-continue key . args)
577   "Frequently used as pre-unwind-handler for agenda"
578   (cond
579    ((eq? key '8sync-caught-error)
580     (match args
581       ((orig-key orig-args stacks)
582        (display "\n*** Caught async exception. ***\n")
583        (format (current-error-port)
584                "* Original key '~s and arguments: ~s *\n"
585                orig-key orig-args)
586        (display "* Caught stacks below (ending with original) *\n\n")
587        (for-each
588         (lambda (s)
589           (display-backtrace s (current-error-port))
590           (newline (current-error-port)))
591         stacks))))
592    (else
593     (format (current-error-port)
594             "\n*** Caught exception with key '~s and arguments: ~s ***\n"
595             key args)
596     (display-backtrace (make-stack #t 1 0)
597                        (current-error-port))
598     (newline (current-error-port)))))
599
600 (define-syntax-rule (maybe-catch-all (catch-handler pre-unwind-handler)
601                                      body ...)
602   (if (or catch-handler pre-unwind-handler)
603       (catch
604         #t
605         (lambda ()
606           body ...)
607         (or catch-handler (lambda _ #f))
608         (or pre-unwind-handler (lambda _ #f)))
609       (begin body ...)))
610
611 (define (wait-for-readable port)
612   (8sync-abort-to-prompt
613    (make-async-request
614     (lambda (kont)
615       (make-read-request port (wrap (kont #f)))))))
616
617 (define (wait-for-writable port)
618   (8sync-abort-to-prompt
619    (make-async-request
620     (lambda (kont)
621       (make-write-request port (wrap (kont #f)))))))
622
623 (define (agenda-run-once! agenda)
624   "Run once through the agenda, and produce a new agenda
625 based on the results"
626   ;; @@: Maybe rename proc -> run-this ?
627   (define (call-proc proc)
628     (call-with-prompt
629      (agenda-prompt-tag agenda)
630      (lambda ()
631        (if (kontinue? proc)
632            ;; Resume continuation.
633            ;; We need to pass in #f, otherwise we sometimes get errors like
634            ;; "Zero values returned to single-valued continuation""
635            ((kontinue-kont proc) #f)
636            ;; Otherwise call the procedure and set up error catching.
637            (maybe-catch-all
638             ((agenda-catch-handler agenda)
639              (agenda-pre-unwind-handler agenda))
640             (proc))))
641      (lambda (kont async-request)
642        (setup-async-request kont async-request))))
643
644   (let ((queue (agenda-queue agenda))
645         (next-queue (make-q)))
646     (while (not (q-empty? queue))
647       (let* ((proc (q-pop! queue))
648              (proc-result (call-proc proc))
649              (enqueue
650               (lambda (run-request)
651                 (let ((request-time (run-request-when run-request)))
652                   (if request-time
653                       (schedule-add! (agenda-schedule agenda) request-time
654                                      (run-request-proc run-request))
655                       (enq! next-queue (run-request-proc run-request)))))))
656         (define (handle-individual result)
657           ;; @@: Could maybe optimize by switching to an explicit cond...
658           (match result
659             ((? run-request? new-proc)
660              (enqueue new-proc))
661             ((? read-request? read-request)
662              (agenda-handle-read-request! agenda read-request))
663             ((? write-request? write-request)
664              (agenda-handle-write-request! agenda write-request))
665             ;; do nothing
666             ;; Remember, we don't throw an error here because procedures can
667             ;; return a run request, eg with run-it, at the end of their
668             ;; evaluation to keep looping.
669             ;; @@: Though is this really a useful feature?
670             (_ #f)))
671         ;; @@: We might support delay-wrapped procedures here
672         (match proc-result
673           ((results ...)
674            (for-each handle-individual results))
675           (one-result (handle-individual one-result)))))
676     (set-agenda-queue! agenda next-queue)))