0aaae2563e3ea4f9f14f3cf962161fc4625f3865
[8sync.git] / 8sync / agenda.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright (C) 2015, 2016 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync agenda)
20   #:use-module (srfi srfi-1)
21   #:use-module (srfi srfi-9)
22   #:use-module (ice-9 q)
23   #:use-module (ice-9 match)
24   #:use-module (ice-9 receive)
25   #:use-module (ice-9 suspendable-ports)
26   #:export (<agenda>
27             make-agenda agenda?
28             agenda-queue agenda-prompt-tag
29             agenda-read-port-map agenda-write-port-map
30             agenda-schedule
31             
32             make-async-prompt-tag
33
34             list->q make-q*
35
36             <schedule>
37             make-schedule schedule?
38             schedule-add! schedule-empty?
39             schedule-segments
40             schedule-soonest-time
41
42             schedule-segments-split schedule-extract-until!
43             add-segments-contents-to-queue!
44
45             <run-request>
46             make-run-request run-request?
47             run-request-proc run-request-when
48
49             run-it wrap wrap-apply run run-at run-delay
50
51             8sync
52             8sleep 8yield
53             
54             ;; used for introspecting the error, but a method for making
55             ;; is not exposed
56             wrapped-exception?
57             wrapped-exception-key wrapped-exception-args
58             wrapped-exception-stacks
59
60             print-error-and-continue
61
62             stop-on-nothing-to-do
63
64             %current-agenda
65             start-agenda agenda-run-once))
66
67 \f
68 ;;; Agenda definition
69 ;;; =================
70
71 ;;; The agenda consists of:
72 ;;;  - a queue of immediate items to handle
73 ;;;  - sheduled future events to be added to a future queue
74 ;;;  - a tag by which running processes can escape for some asynchronous
75 ;;;    operation (from which they can be returned later)
76 ;;;  - a mapping of ports to various handler procedures
77 ;;;
78 ;;; @@: Is this next part deprecated?
79 ;;; The goal, maybe eventually, is for this all to be immutable and functional.
80 ;;; However, we aren't there yet.  Some tricky things:
81 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
82 ;;;  - Need to use immutable queues (ijp's pfds library?)
83 ;;;  - Modeling reading from ports as something repeatable,
84 ;;;    and with reasonable separation from functional components?
85
86 ;; TODO: Tear out the immutable agenda aspect until we're actually ready
87 ;;   to use it.
88 (define-record-type <agenda>
89   (make-agenda-intern queue prompt-tag
90                       read-port-map write-port-map
91                       schedule catch-handler pre-unwind-handler)
92   agenda?
93   (queue agenda-queue set-agenda-queue!)
94   (prompt-tag agenda-prompt-tag)
95   (read-port-map agenda-read-port-map)
96   (write-port-map agenda-write-port-map)
97   (schedule agenda-schedule)
98   (catch-handler agenda-catch-handler)
99   (pre-unwind-handler agenda-pre-unwind-handler))
100
101 (define (make-async-prompt-tag)
102   "Make an async prompt tag for an agenda.
103
104 Generally done automatically for the user through (make-agenda)."
105   (make-prompt-tag "prompt"))
106
107 (define* (make-agenda #:key
108                       (queue (make-q))
109                       (prompt (make-prompt-tag))
110                       (read-port-map (make-hash-table))
111                       (write-port-map (make-hash-table))
112                       (schedule (make-schedule))
113                       (catch-handler #f)
114                       (pre-unwind-handler print-error-and-continue))
115   ;; TODO: document arguments
116   "Make a fresh agenda."
117   (make-agenda-intern queue prompt
118                       read-port-map write-port-map
119                       schedule catch-handler pre-unwind-handler))
120
121 (define (current-agenda-prompt)
122   "Get the prompt for the current agenda; signal an error if there isn't one."
123   (let ((current-agenda (%current-agenda)))
124     (if (not current-agenda)
125         (throw
126          'no-current-agenda
127          "Can't get current agenda prompt if there's no current agenda!")
128         (agenda-prompt-tag current-agenda))))
129
130 ;; helper for making queues for an agenda
131 (define (list->q lst)
132   "Makes a queue composed of LST items"
133   (let ((q (make-q)))
134     (for-each
135      (lambda (x)
136        (enq! q x))
137      lst)
138     q))
139
140 (define (make-q* . args)
141   "Makes a queue and populates it with this invocation's ARGS"
142   (list->q args))
143
144 \f
145 ;;; Schedule
146 ;;; ========
147
148 ;;; This is where we handle timed events for the future
149
150 ;; This section totally borrows from the ideas in SICP
151 ;; <3 <3 <3
152
153 ;; NOTE: time is a cons of (seconds . microseconds)
154
155 (define-record-type <time-segment>
156   (make-time-segment-intern time queue)
157   time-segment?
158   (time time-segment-time)
159   (queue time-segment-queue))
160
161 (define* (make-time-segment time #:optional (queue (make-q)))
162   "Make a time segment of TIME and QUEUE
163
164 No automatic conversion is done, so you might have to
165 run (time-segment-right-format) first."
166   (make-time-segment-intern time queue))
167
168 (define (time< time1 time2)
169   "Check if TIME1 is less than TIME2"
170   (cond ((< (car time1)
171             (car time2))
172          #t)
173         ((and (= (car time1)
174                  (car time2))
175               (< (cdr time1)
176                  (cdr time2)))
177          #t)
178         (else #f)))
179
180 (define (time= time1 time2)
181   "Check whether TIME1 and TIME2 are equivalent"
182   (and (= (car time1) (car time2))
183        (= (cdr time1) (cdr time2))))
184
185 (define (time<= time1 time2)
186   "Check if TIME1 is less than or equal to TIME2"
187   (or (time< time1 time2)
188       (time= time1 time2)))
189
190 ;; @@: Maybe we should use floor/ here?
191 (define (time-carry-correct time)
192   "Corrects/handles time microsecond carry.
193 Will produce (0 . 0) instead of a negative number, if needed."
194   (cond ((>= (cdr time) 1000000)
195          (cons
196           (+ (car time) 1)
197           (- (cdr time) 1000000)))
198         ((< (cdr time) 0)
199          (if (= (car time) 0)
200              '(0 0)
201              (cons
202               (- (car time) 1)
203               (+ (cdr time) 1000000))))
204         (else time)))
205
206 (define (time-minus time1 time2)
207   "Subtract TIME2 from TIME1"
208   (time-carry-correct
209    (cons (- (car time1) (car time2))
210          (- (cdr time1) (cdr time2)))))
211
212 ;; @@: Unused?
213 (define (time-plus time1 time2)
214   "Add TIME1 and TIME2"
215   (time-carry-correct
216    (cons (+ (car time1) (car time2))
217          (+ (cdr time1) (cdr time2)))))
218
219 (define-record-type <schedule>
220   (make-schedule-intern segments)
221   schedule?
222   (segments schedule-segments set-schedule-segments!))
223
224 (define* (make-schedule #:optional segments)
225   "Make a schedule, optionally pre-composed of SEGMENTS"
226   (make-schedule-intern (or segments '())))
227
228 (define (schedule-soonest-time schedule)
229   "Return a cons of (sec . usec) for next time segement, or #f if none"
230   (let ((segments (schedule-segments schedule)))
231     (if (eq? segments '())
232         #f
233         (time-segment-time (car segments)))))
234
235 ;; TODO: This code is reasonably easy to read but it
236 ;;   mutates AND is worst case of O(n) in both space and time :(
237 ;;   but at least it'll be reasonably easy to refactor to
238 ;;   a more functional setup?
239 (define (schedule-add! schedule time proc)
240   "Mutate SCHEDULE, adding PROC at an appropriate time segment for TIME"
241   (define (new-time-segment)
242     (let ((new-segment
243            (make-time-segment time)))
244       (enq! (time-segment-queue new-segment) proc)
245       new-segment))
246   (define (loop segments)
247     (define (segment-equals-time? segment)
248       (time= time (time-segment-time segment)))
249
250     (define (segment-more-than-time? segment)
251       (time< time (time-segment-time segment)))
252
253     ;; We could switch this out to be more mutate'y
254     ;; and avoid the O(n) of space... is that over-optimizing?
255     (match segments
256       ;; If we're at the end of the list, time to make a new
257       ;; segment...
258       ('() (cons (new-time-segment) '()))
259       ;; If the segment's time is exactly our time, good news
260       ;; everyone!  Let's append our stuff to its queue
261       (((? segment-equals-time? first) rest ...)
262        (enq! (time-segment-queue first) proc)
263        segments)
264       ;; If the first segment is more than our time,
265       ;; ours belongs before this one, so add it and
266       ;; start consing our way back
267       (((? segment-more-than-time? first) rest ...)
268        (cons (new-time-segment) segments))
269       ;; Otherwise, build up recursive result
270       ((first rest ... )
271        (cons first (loop rest)))))
272   (set-schedule-segments!
273    schedule
274    (loop (schedule-segments schedule))))
275
276 (define (schedule-empty? schedule)
277   "Check if the SCHEDULE is currently empty"
278   (eq? (schedule-segments schedule) '()))
279
280 (define (schedule-segments-split schedule time)
281   "Does a multiple value return of time segments before/at and after TIME"
282   (define (segment-is-now? segment)
283     (time= (time-segment-time segment) time))
284   (define (segment-is-before-now? segment)
285     (time< (time-segment-time segment) time))
286
287   (let loop ((segments-before '())
288              (segments-left (schedule-segments schedule)))
289     (match segments-left
290       ;; end of the line, return
291       ('()
292        (values (reverse segments-before) '()))
293
294       ;; It's right now, so time to stop, but include this one in before
295       ;; but otherwise return
296       (((? segment-is-now? first) rest ...)
297        (values (reverse (cons first segments-before)) rest))
298
299       ;; This is prior or at now, so add it and keep going
300       (((? segment-is-before-now? first) rest ...)
301        (loop (cons first segments-before) rest))
302
303       ;; Otherwise it's past now, just return what we have
304       (segments-after
305        (values segments-before segments-after)))))
306
307 (define (schedule-extract-until! schedule time)
308   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
309   (receive (segments-before segments-after)
310       (schedule-segments-split schedule time)
311     (set-schedule-segments! schedule segments-after)
312     segments-before))
313
314 (define (add-segments-contents-to-queue! segments queue)
315   (for-each
316    (lambda (segment)
317      (let ((seg-queue (time-segment-queue segment)))
318        (while (not (q-empty? seg-queue))
319          (enq! queue (deq! seg-queue)))))
320    segments))
321
322
323 \f
324 ;;; Request to run stuff
325 ;;; ====================
326
327 (define-record-type <run-request>
328   (make-run-request proc when)
329   run-request?
330   (proc run-request-proc)
331   (when run-request-when))
332
333 (define* (run-it proc #:optional when)
334   "Make a request to run PROC (possibly at WHEN)"
335   (make-run-request proc when))
336
337 (define-syntax-rule (wrap body ...)
338   "Wrap contents in a procedure"
339   (lambda ()
340     body ...))
341
342 (define-syntax-rule (wrap-apply body)
343   "Wrap possibly multi-value function in a procedure, applies all arguments"
344   (lambda args
345     (apply body args)))
346
347
348 ;; @@: Do we really want `body ...' here?
349 ;;   what about just `body'?
350 (define-syntax-rule (run body ...)
351   "Run everything in BODY but wrap in a convenient procedure"
352   (make-run-request (wrap body ...) #f))
353
354 (define-syntax-rule (run-at body ... when)
355   "Run BODY at WHEN"
356   (make-run-request (wrap body ...) when))
357
358 (define-syntax-rule (run-delay body ... delay-time)
359   "Run BODY at DELAY-TIME time from now"
360   (make-run-request (wrap body ...) (delayed-time delay-time)))
361
362
363 \f
364 ;;; Asynchronous escape to run things
365 ;;; =================================
366
367 (define-syntax-rule (8sync-abort-to-prompt async-request)
368   (abort-to-prompt (current-agenda-prompt)
369                    async-request))
370
371 ;; Async port request and run-request meta-requests
372 (define (make-async-request proc)
373   "Wrap PROC in an async-request
374
375 The purpose of this is to make sure that users don't accidentally
376 return the wrong thing via (8sync) and trip themselves up."
377   (cons '*async-request* proc))
378
379 (define (setup-async-request resume-kont async-request)
380   "Complete an async request for agenda-run-once's continuation handling"
381   (match async-request
382     (('*async-request* . async-setup-proc)
383      (async-setup-proc resume-kont))
384     ;; TODO: deliver more helpful errors depending on what the user
385     ;;   returned
386     (_ (throw 'invalid-async-request
387               "Invalid request passed back via an (8sync) procedure."
388               async-request))))
389
390 (define-syntax-rule (8sync body ...)
391   "Run body asynchronously but ignore its result...
392 forge ahead in our current function!"
393   (8sync-abort-to-prompt
394    (make-async-request
395     (lambda (kont)
396       (list (make-run-request
397              ;; What's with returning #f to kont?
398              ;; Otherwise we sometimes get errors like
399              ;; "Zero values returned to single-valued continuation""
400              (wrap (kont #f)) #f)
401             (make-run-request (lambda () body ...) #f))))))
402
403 (define (delayed-time in-secs)
404   "Calculate a cons of '(sec . usec) of IN-SECS from current time"
405   (define cur-time (gettimeofday))
406   (define cur-secs (car cur-time))
407   (define cur-usecs (cdr cur-time))
408   (define (convert-non-integer)
409     (define next-time-in-usecs
410       (+ (* (+ in-secs cur-secs) ; add our seconds to current seconds
411             1000000)             ; and turn into usecs
412          cur-usecs))             ; then add in current usecs
413     ;; convert into sec / usec pair
414     (receive (secs usecs)
415         (floor/ next-time-in-usecs 1000000)
416       (cons secs (floor usecs))))
417   (define (convert-integer)
418     (cons (+ in-secs cur-secs) cur-usecs))
419   (if (integer? in-secs)
420       (convert-integer)
421       (convert-non-integer)))
422
423 ;; TODO: Rewrite when we move to this being just `sleep'.
424 (define (8sleep secs)
425   "Like sleep, but asynchronous."
426   (8sync-abort-to-prompt
427    (make-async-request
428     (lambda (kont)
429       (make-run-request (lambda () (kont #f)) (delayed-time secs))))))
430
431 ;; Voluntarily yield execution
432 (define (8yield)
433   "Voluntarily yield execution to the scheduler."
434   (8sync-abort-to-prompt
435    (make-async-request
436     (lambda (kont)
437       (make-run-request (lambda () (kont #f)) #f)))))
438
439 \f
440 ;;; Execution of agenda, and current agenda
441 ;;; =======================================
442
443 (define %current-agenda (make-parameter #f))
444
445 (define (update-agenda-from-select! agenda)
446   "Potentially (select) on ports specified in agenda, adding items to queue.
447
448 Also handles sleeping when all we have to do is wait on the schedule."
449   (define (hash-keys hash)
450     (hash-map->list (lambda (k v) k) hash))
451   (define (get-wait-time)
452     ;; TODO: we need to figure this out based on whether there's anything
453     ;;   in the queue, and if not, how long till the next scheduled item
454     (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
455       (cond 
456        ((not (q-empty? (agenda-queue agenda)))
457         (cons 0 0))
458        (soonest-time    ; ie, the agenda is non-empty
459         (let* ((current-time (gettimeofday)))
460           (if (time<= soonest-time current-time)
461               ;; Well there's something due so let's select
462               ;; (this avoids a (possible?) race condition chance)
463               (cons 0 0)
464               (time-minus soonest-time current-time))))
465        (else
466         (cons #f #f)))))
467   (define (do-select)
468     ;; TODO: support usecond wait time too
469     (match (get-wait-time)
470       ((sec . usec)
471        (catch 'system-error
472          (lambda ()
473            (select (hash-keys (agenda-read-port-map agenda))
474                    (hash-keys (agenda-write-port-map agenda))
475                    '()
476                    sec usec))
477          (lambda (key . rest-args)
478            (match rest-args
479              ((_ _ _ (EINTR))
480               '(() () ()))
481              (_ (error "Unhandled error in select!" key rest-args))))))))
482   (define (get-procs-to-run)
483     (define (ports->procs ports port-map)
484       (lambda (initial-procs)
485         (fold
486          (lambda (port prev)
487            (define proc (hashq-ref port-map port))
488            ;; Now that we've selected on this port, it can be removed
489            (hashq-remove! port-map port)
490            (cons proc prev))
491          initial-procs
492          ports)))
493     (match (do-select)
494       ((read-ports write-ports except-ports) ; except-ports not used
495        ((compose (ports->procs
496                   read-ports
497                   (agenda-read-port-map agenda))
498                  (ports->procs
499                   write-ports
500                   (agenda-write-port-map agenda)))
501         '()))))
502   (define (update-agenda)
503     (let ((procs-to-run (get-procs-to-run))
504           (q (agenda-queue agenda)))
505       (for-each
506        (lambda (proc)
507          (enq! q proc))
508        procs-to-run))
509     agenda)
510   (define (ports-to-select?)
511     (define (has-items? selector)
512       ;; @@: O(n)
513       ;;    ... we could use hash-for-each and a continuation to jump
514       ;;    out with a #t at first indication of an item
515       (not (= (hash-count (const #t)
516                           (selector agenda))
517               0)))
518     (or (has-items? agenda-read-port-map)
519         (has-items? agenda-write-port-map)))
520
521   (if (or (ports-to-select?)
522           ;; select doubles as sleep...
523           (not (schedule-empty? (agenda-schedule agenda)))) 
524       (update-agenda)
525       agenda))
526
527 (define-record-type <read-request>
528   (make-read-request port proc)
529   read-request?
530   (port read-request-port)
531   (proc read-request-proc))
532
533 (define-record-type <write-request>
534   (make-write-request port proc)
535   write-request?
536   (port write-request-port)
537   (proc write-request-proc))
538
539 (define (agenda-handle-read-request! agenda read-request)
540   "Handle <read-request>, which is a request to add this port to the poll/select
541 on suspendable ports."
542   (hashq-set! (agenda-read-port-map agenda)
543               (read-request-port read-request)
544               (read-request-proc read-request)))
545
546 (define (agenda-handle-write-request! agenda write-request)
547   (hashq-set! (agenda-write-port-map agenda)
548               (write-request-port write-request)
549               (write-request-proc write-request)))
550
551 (define (stop-on-nothing-to-do agenda)
552   (and (q-empty? (agenda-queue agenda))
553        (schedule-empty? (agenda-schedule agenda))
554        (= 0 (hash-count (const #t) (agenda-read-port-map agenda)))
555        (= 0 (hash-count (const #t) (agenda-write-port-map agenda)))))
556
557
558 (define* (start-agenda agenda
559                        #:key (stop-condition stop-on-nothing-to-do)
560                        ;; For live hacking madness, etc
561                        (post-run-hook #f))
562   ;; TODO: Document fields
563   "Start up the AGENDA"
564   (install-suspendable-ports!)
565   (while (not (stop-condition agenda))
566     (agenda-run-once! agenda)
567     (update-agenda-from-select! agenda)
568     ;; Update the agenda's current queue based on
569     ;; currently applicable time segments
570     (add-segments-contents-to-queue!
571      (schedule-extract-until! (agenda-schedule agenda) (gettimeofday))
572      (agenda-queue agenda))
573     (if post-run-hook
574         (post-run-hook agenda)))
575   'done)
576
577 (define (print-error-and-continue key . args)
578   "Frequently used as pre-unwind-handler for agenda"
579   (cond
580    ((eq? key '8sync-caught-error)
581     (match args
582       ((orig-key orig-args stacks)
583        (display "\n*** Caught async exception. ***\n")
584        (format (current-error-port)
585                "* Original key '~s and arguments: ~s *\n"
586                orig-key orig-args)
587        (display "* Caught stacks below (ending with original) *\n\n")
588        (for-each
589         (lambda (s)
590           (display-backtrace s (current-error-port))
591           (newline (current-error-port)))
592         stacks))))
593    (else
594     (format (current-error-port)
595             "\n*** Caught exception with key '~s and arguments: ~s ***\n"
596             key args)
597     (display-backtrace (make-stack #t 1 0)
598                        (current-error-port))
599     (newline (current-error-port)))))
600
601 (define-syntax-rule (maybe-catch-all (catch-handler pre-unwind-handler)
602                                      body ...)
603   (if (or catch-handler pre-unwind-handler)
604       (catch
605         #t
606         (lambda ()
607           body ...)
608         (or catch-handler (lambda _ #f))
609         (or pre-unwind-handler (lambda _ #f)))
610       (begin body ...)))
611
612 (define (wait-for-readable port)
613   (8sync-abort-to-prompt
614    (make-async-request
615     (lambda (kont)
616       (make-read-request port (wrap (kont #f)))))))
617
618 (define (wait-for-writable port)
619   (8sync-abort-to-prompt
620    (make-async-request
621     (lambda (kont)
622       (make-write-request port (wrap (kont #f)))))))
623
624 (define (agenda-run-once! agenda)
625   "Run once through the agenda, and produce a new agenda
626 based on the results"
627   (define (call-proc proc)
628     (call-with-prompt
629      (agenda-prompt-tag agenda)
630      (lambda ()
631        (parameterize ((%current-agenda agenda)
632                       ;; @@: Couldn't we just parameterize this at the start of
633                       ;;   the agenda...?
634                       (current-read-waiter wait-for-readable)
635                       (current-write-waiter wait-for-writable))
636          (maybe-catch-all
637           ((agenda-catch-handler agenda)
638            (agenda-pre-unwind-handler agenda))
639           (proc))))
640      (lambda (kont async-request)
641        (setup-async-request kont async-request))))
642
643   (let ((queue (agenda-queue agenda))
644         (next-queue (make-q)))
645     (while (not (q-empty? queue))
646       (let* ((proc (q-pop! queue))
647              (proc-result (call-proc proc))
648              (enqueue
649               (lambda (run-request)
650                 (let ((request-time (run-request-when run-request)))
651                   (if request-time
652                       (schedule-add! (agenda-schedule agenda) request-time
653                                      (run-request-proc run-request))
654                       (enq! next-queue (run-request-proc run-request)))))))
655         (define (handle-individual result)
656           ;; @@: Could maybe optimize by switching to an explicit cond...
657           (match result
658             ((? run-request? new-proc)
659              (enqueue new-proc))
660             ((? read-request? read-request)
661              (agenda-handle-read-request! agenda read-request))
662             ((? write-request? write-request)
663              (agenda-handle-write-request! agenda write-request))
664             ;; do nothing
665             ;; Remember, we don't throw an error here because procedures can
666             ;; return a run request, eg with run-it, at the end of their
667             ;; evaluation to keep looping.
668             ;; @@: Though is this really a useful feature?
669             (_ #f)))
670         ;; @@: We might support delay-wrapped procedures here
671         (match proc-result
672           ((results ...)
673            (for-each handle-individual results))
674           (one-result (handle-individual one-result)))))
675     ;; TODO: Alternately, we could return the next-queue
676     ;;   along with changes to be added to the schedule here?
677     ;; Return new agenda, with next queue set
678     (set-agenda-queue! agenda next-queue)))