(hash-keys hash) not (hash-keys selector)
[8sync.git] / loopy.scm
1 (define-module (loopy agenda)
2   #:use-module (srfi srfi-1)
3   #:use-module (srfi srfi-9)
4   #:use-module (srfi srfi-9 gnu)
5   #:use-module (ice-9 q)
6   #:use-module (ice-9 match)
7   #:use-module (ice-9 receive)
8   #:export (<agenda>
9             make-agenda agenda?
10             agenda-queue agenda-prompt-tag
11             agenda-read-port-map agenda-write-port-map agenda-except-port-map
12             agenda-schedule
13             
14             make-async-prompt-tag
15
16             <time-segment>
17             make-time-segment time-segment?
18             time-segment-time time-segment-queue
19
20             time-< time-= time-<= time-+
21
22             <time-delta>
23             make-time-delta tdelta time-delta?
24             time-delta-sec time-delta-usec
25
26             <schedule>
27             make-schedule schedule?
28             schedule-add! schedule-empty?
29             schedule-segments
30             schedule-soonest-time
31
32             schedule-segments-split schedule-extract-until!
33             add-segments-contents-to-queue!
34
35             <run-request>
36             make-run-request run-request?
37             run-request-proc run-request-when
38
39             run wrap run-wrap run-wrap-at
40
41             %current-agenda
42             start-agenda agenda-run-once))
43
44 ;; @@: Using immutable agendas here, so wouldn't it make sense to
45 ;;   replace this queue stuff with using pfds based immutable queues?
46
47 \f
48 ;;; Agenda definition
49 ;;; =================
50
51 ;;; The agenda consists of:
52 ;;;  - a queue of immediate items to handle
53 ;;;  - sheduled future events to be added to a future queue
54 ;;;  - a tag by which running processes can escape for some asynchronous
55 ;;;    operation (from which they can be returned later)
56 ;;;  - a mapping of ports to various handler procedures
57 ;;;
58 ;;; The goal, eventually, is for this all to be immutable and functional.
59 ;;; However, we aren't there yet.  Some tricky things:
60 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
61 ;;;  - Need to use immutable queues (ijp's pfds library?)
62 ;;;  - Modeling reading from ports as something repeatable,
63 ;;;    and with reasonable separation from functional components?
64
65 (define-immutable-record-type <agenda>
66   (make-agenda-intern queue prompt-tag
67                       read-port-map write-port-map except-port-map
68                       schedule time)
69   agenda?
70   (queue agenda-queue)
71   (prompt-tag agenda-prompt-tag)
72   (read-port-map agenda-read-port-map)
73   (write-port-map agenda-write-port-map)
74   (except-port-map agenda-except-port-map)
75   (schedule agenda-schedule)
76   (time agenda-time))
77
78 (define (make-async-prompt-tag)
79   (make-prompt-tag "prompt"))
80
81 (define* (make-agenda #:key
82                       (queue (make-q))
83                       (prompt (make-prompt-tag))
84                       (read-port-map (make-hash-table))
85                       (write-port-map (make-hash-table))
86                       (except-port-map (make-hash-table))
87                       (schedule (make-schedule))
88                       (time (gettimeofday)))
89   (make-agenda-intern queue prompt
90                       read-port-map write-port-map except-port-map
91                       schedule time))
92
93
94 \f
95 ;;; Schedule
96 ;;; ========
97
98 ;;; This is where we handle timed events for the future
99
100 ;; This section totally borrows from the ideas in SICP
101 ;; <3 <3 <3
102
103 ;; NOTE: time is a cons of (seconds . microseconds)
104
105 (define-record-type <time-segment>
106   (make-time-segment-intern time queue)
107   time-segment?
108   (time time-segment-time)
109   (queue time-segment-queue))
110
111 (define (time-segment-right-format time)
112   (match time
113     ;; time is already a cons of second and microsecnd
114     (((? integer? s) . (? integer? u)) time)
115     ;; time was just an integer (just the second)
116     ((? integer? _) (cons time 0))
117     (_ (throw 'invalid-time "Invalid time" time))))
118
119 (define* (make-time-segment time #:optional (queue (make-q)))
120   (make-time-segment-intern time queue))
121
122 (define (time-< time1 time2)
123   (cond ((< (car time1)
124             (car time2))
125          #t)
126         ((and (= (car time1)
127                  (car time2))
128               (< (cdr time1)
129                  (cdr time2)))
130          #t)
131         (else #f)))
132
133 (define (time-= time1 time2)
134   (and (= (car time1) (car time2))
135        (= (cdr time1) (cdr time2))))
136
137 (define (time-<= time1 time2)
138   (or (time-< time1 time2)
139       (time-= time1 time2)))
140
141
142 (define-record-type <time-delta>
143   (make-time-delta-intern sec usec)
144   time-delta?
145   (sec time-delta-sec)
146   (usec time-delta-usec))
147
148 (define* (make-time-delta sec #:optional usec)
149   (make-time-delta-intern sec (or usec 0)))
150
151 (define tdelta make-time-delta)
152
153 (define (time-+ time time-delta)
154   (cons (+ (car time) (time-delta-sec time-delta))
155         (+ (cdr time) (time-delta-usec time-delta))))
156
157
158 (define-record-type <schedule>
159   (make-schedule-intern segments)
160   schedule?
161   (segments schedule-segments set-schedule-segments!))
162
163 (define* (make-schedule #:optional segments)
164   (make-schedule-intern (or segments '())))
165
166 (define (schedule-soonest-time schedule)
167   "Return a cons of (sec . usec) for next time segement, or #f if none"
168   (let ((segments (schedule-segments schedule)))
169     (if (eq? segments '())
170         #f
171         (time-segment-time (car segments)))))
172
173 ;; TODO: This code is reasonably easy to read but it
174 ;;   mutates AND is worst case of O(n) in both space and time :(
175 ;;   but at least it'll be reasonably easy to refactor to
176 ;;   a more functional setup?
177 (define (schedule-add! schedule time proc)
178   (let ((time (time-segment-right-format time)))
179     (define (new-time-segment)
180       (let ((new-segment
181              (make-time-segment time)))
182         (enq! (time-segment-queue new-segment) proc)
183         new-segment))
184     (define (loop segments)
185       (define (segment-equals-time? segment)
186         (time-= time (time-segment-time segment)))
187
188       (define (segment-more-than-time? segment)
189         (time-< time (time-segment-time segment)))
190
191       ;; We could switch this out to be more mutate'y
192       ;; and avoid the O(n) of space... is that over-optimizing?
193       (match segments
194         ;; If we're at the end of the list, time to make a new
195         ;; segment...
196         ('() (cons (new-time-segment) '()))
197         ;; If the segment's time is exactly our time, good news
198         ;; everyone!  Let's append our stuff to its queue
199         (((? segment-equals-time? first) rest ...)
200          (enq! (time-segment-queue first) proc)
201          segments)
202         ;; If the first segment is more than our time,
203         ;; ours belongs before this one, so add it and
204         ;; start consing our way back
205         (((? segment-more-than-time? first) rest ...)
206          (cons (new-time-segment) segments))
207         ;; Otherwise, build up recursive result
208         ((first rest ... )
209          (cons first (loop rest)))))
210     (set-schedule-segments!
211      schedule
212      (loop (schedule-segments schedule)))))
213
214 (define (schedule-empty? schedule)
215   (eq? (schedule-segments schedule) '()))
216
217 (define (schedule-segments-split schedule time)
218   "Does a multiple value return of time segments before/at and after TIME"
219   (let ((time (time-segment-right-format time)))
220     (define (segment-is-now? segment)
221       (time-= (time-segment-time segment) time))
222     (define (segment-is-before-now? segment)
223       (time-< (time-segment-time segment) time))
224
225     (let loop ((segments-before '())
226                (segments-left (schedule-segments schedule)))
227       (match segments-left
228         ;; end of the line, return
229         ('()
230          (values (reverse segments-before) '()))
231
232         ;; It's right now, so time to stop, but include this one in before
233         ;; but otherwise return
234         (((? segment-is-now? first) rest ...)
235          (values (reverse (cons first segments-before)) rest))
236
237         ;; This is prior or at now, so add it and keep going
238         (((? segment-is-before-now? first) rest ...)
239          (loop (cons first segments-before) rest))
240
241         ;; Otherwise it's past now, just return what we have
242         (segments-after
243          (values segments-before segments-after))))))
244
245 (define (schedule-extract-until! schedule time)
246   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
247   (receive (segments-before segments-after)
248       (schedule-segments-split schedule time)
249     (set-schedule-segments! schedule segments-after)
250     segments-before))
251
252 (define (add-segments-contents-to-queue! segments queue)
253   (for-each
254    (lambda (segment)
255      (let ((seg-queue (time-segment-queue segment)))
256        (while (not (q-empty? seg-queue))
257          (enq! queue (deq! seg-queue)))))
258    segments))
259
260
261 \f
262 ;;; Port handling
263 ;;; =============
264
265 (define (make-port-mapping)
266   (make-hash-table))
267
268 (define* (port-mapping-set! port-mapping port #:optional read write except)
269   "Sets port-mapping for reader / writer / exception handlers"
270   (if (not (or read write except))
271       (throw 'no-handlers-given "No handlers given for port" port))
272   (hashq-set! port-mapping port
273               `#(,read ,write ,except)))
274
275 (define (port-mapping-remove! port-mapping port)
276   (hashq-remove! port-mapping port))
277
278 ;; TODO: This is O(n), I'm pretty sure :\
279 ;; ... it might be worthwhile for us to have a
280 ;;   port-mapping record that keeps a count of how many
281 ;;   handlers (maybe via a promise?)
282 (define (port-mapping-empty? port-mapping)
283   "Is this port mapping empty?"
284   (eq? (hash-count (const #t) port-mapping) 0))
285
286 (define (port-mapping-non-empty? port-mapping)
287   "Whether this port-mapping contains any elements"
288   (not (port-mapping-empty? port-mapping)))
289
290
291 \f
292 ;;; Request to run stuff
293 ;;; ====================
294
295 (define-record-type <run-request>
296   (make-run-request proc when)
297   run-request?
298   (proc run-request-proc)
299   (when run-request-when))
300
301 (define* (run proc #:optional when)
302   (make-run-request proc when))
303
304 (define-syntax-rule (wrap body ...)
305   (lambda ()
306     body ...))
307
308 (define-syntax-rule (run-wrap body ...)
309   (run (wrap body ...)))
310
311 (define-syntax-rule (run-wrap-at body ... when)
312   (run (wrap body ...) when))
313
314 \f
315 ;;; Execution of agenda, and current agenda
316 ;;; =======================================
317
318 (define %current-agenda (make-parameter #f))
319
320 (define (update-agenda-from-select! agenda)
321   (define (hash-keys hash)
322     (hash-map->list (lambda (k v) k) hash))
323   (define (get-wait-time)
324     ;; TODO: we need to figure this out based on whether there's anything
325     ;;   in the queue, and if not, how long till the next scheduled item
326     (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
327       (cond 
328        ((not (q-empty? (agenda-queue agenda)))
329         (values 0 0))
330        (soonest-time    ; ie, the agenda is non-empty
331         (let* ((current-time (agenda-time agenda)))
332           (if (time-<= soonest-time current-time)
333               ;; Well there's something due so let's select
334               ;; (this avoids a (possible?) race condition chance)
335               (values 0 0)
336               (values
337                (- (car soonest-time) (car current-time))
338                (- (cdr soonest-time) (cdr current-time))))))
339        (else
340         (values #f #f)))))
341   (define (do-select)
342     ;; TODO: support usecond wait time too
343     (receive (sec usec)
344         (get-wait-time)
345       (select (hash-keys (agenda-read-port-map agenda))
346               (hash-keys (agenda-write-port-map agenda))
347               (hash-keys (agenda-except-port-map agenda))
348               sec usec)))
349   (define (get-procs-to-run)
350     (define (ports->procs ports port-map)
351       (lambda (initial-procs)
352         (fold
353          (lambda (port prev)
354            (cons (lambda ()
355                    ((hash-ref port-map port) port))
356                  prev))
357          initial-procs
358          ports)))
359     (match (do-select)
360       ((read-ports write-ports except-ports)
361        ;; @@: Come on, we can do better than append ;P
362        ((compose (ports->procs
363                   read-ports
364                   (agenda-read-port-map agenda))
365                  (ports->procs
366                   write-ports
367                   (agenda-write-port-map agenda))
368                  (ports->procs
369                   except-ports
370                   (agenda-except-port-map agenda)))
371         '()))))
372   (define (update-agenda)
373     (let ((procs-to-run (get-procs-to-run))
374           (q (agenda-queue agenda)))
375       (for-each
376        (lambda (proc)
377          (enq! q proc))
378        procs-to-run))
379     agenda)
380   (define (ports-to-select?)
381     (define (has-items? selector)
382       ;; @@: O(n)
383       ;;    ... we could use hash-for-each and a continuation to jump
384       ;;    out with a #t at first indication of an item
385       (not (= (hash-count (const #t)
386                           (selector agenda))
387               0)))
388     (or (has-items? agenda-read-port-map)
389         (has-items? agenda-write-port-map)
390         (has-items? agenda-except-port-map)))
391
392   (if (ports-to-select?)
393       (update-agenda)
394       agenda))
395
396
397 (define* (start-agenda agenda
398                        #:key stop-condition
399                        (get-time gettimeofday)
400                        (handle-ports update-agenda-from-select!))
401   (let loop ((agenda agenda))
402     (let ((agenda   
403            ;; @@: Hm, maybe here would be a great place to handle
404            ;;   select'ing on ports.
405            ;;   We could compose over agenda-run-once and agenda-read-ports
406            (parameterize ((%current-agenda agenda))
407              (agenda-run-once agenda))))
408       (if (and stop-condition (stop-condition agenda))
409           'done
410           (let* ((new-time (get-time))
411                  (agenda
412                   (handle-ports
413                    ;; Adjust the agenda's time just in time
414                    ;; We do this here rather than in agenda-run-once to make
415                    ;; agenda-run-once's behavior fairly predictable
416                    (set-field agenda (agenda-time) new-time))))
417             ;; Update the agenda's current queue based on
418             ;; currently applicable time segments
419             (add-segments-contents-to-queue!
420              (schedule-extract-until! (agenda-schedule agenda) new-time)
421              (agenda-queue agenda))
422             (loop agenda))))))
423
424 (define (agenda-run-once agenda)
425   "Run once through the agenda, and produce a new agenda
426 based on the results"
427   (define (call-proc proc)
428     (call-with-prompt
429         (agenda-prompt-tag agenda)
430       (lambda ()
431         (proc))
432       ;; TODO
433       (lambda (k) k)))
434
435   (let ((queue (agenda-queue agenda))
436         (next-queue (make-q)))
437     (while (not (q-empty? queue))
438       (let* ((proc (q-pop! queue))
439              (proc-result (call-proc proc))
440              (enqueue
441               (lambda (run-request)
442                 (define (schedule-at! time proc)
443                   (schedule-add! (agenda-schedule agenda) time proc))
444                 (let ((request-time (run-request-when run-request)))
445                   (match request-time
446                     ((? time-delta? time-delta)
447                      (let ((time (time-+ (agenda-time agenda)
448                                          time-delta)))
449                        (schedule-at! time (run-request-proc run-request))))
450                     ((? integer? sec)
451                      (let ((time (cons sec 0)))
452                        (schedule-at! time (run-request-proc run-request))))
453                     (((? integer? sec) . (? integer? usec))
454                      (schedule-at! request-time (run-request-proc run-request)))
455                     (#f
456                      (enq! next-queue (run-request-proc run-request))))))))
457         ;; @@: We might support delay-wrapped procedures here
458         (match proc-result
459           ;; TODO: replace procedure with something that indicates
460           ;;   intent to run.  Use a (run foo) procedure
461           ((? run-request? new-proc)
462            (enqueue new-proc))
463           (((? run-request? new-procs) ...)
464            (for-each
465             (lambda (new-proc)
466               (enqueue new-proc))
467             new-procs))
468           ;; do nothing
469           (_ #f))))
470     ;; TODO: Alternately, we could return the next-queue
471     ;;   along with changes to be added to the schedule here?
472     ;; Return new agenda, with next queue set
473     (set-field agenda (agenda-queue) next-queue)))