run, wrap, and run-wrap stuff
[8sync.git] / loopy.scm
1 (define-module (loopy agenda)
2   #:use-module (srfi srfi-9)
3   #:use-module (srfi srfi-9 gnu)
4   #:use-module (ice-9 q)
5   #:use-module (ice-9 match)
6   #:use-module (ice-9 receive)
7   #:export (<agenda>
8             make-agenda agenda?
9             agenda-queue agenda-prompt-tag
10             agenda-port-pmapping agenda-schedule
11             
12             make-async-prompt-tag
13
14             <time-segment>
15             make-time-segment time-segment?
16             time-segment-time time-segment-queue
17
18             time-< time-= time-<=
19
20             <schedule>
21             make-schedule schedule?
22             schedule-add! schedule-empty?
23             schedule-segments
24
25             schedule-segments-split schedule-extract-until!
26             add-segments-contents-to-queue!
27
28             make-port-mapping
29             port-mapping-set! port-mapping-remove!
30             port-mapping-empty? port-mapping-non-empty?
31
32             <run-request>
33             make-run-request run-request?
34             run-request-proc run-request-when
35
36             run wrap run-wrap run-wrap-at
37
38             %current-agenda
39             start-agenda agenda-run-once))
40
41 ;; @@: Using immutable agendas here, so wouldn't it make sense to
42 ;;   replace this queue stuff with using pfds based immutable queues?
43
44 \f
45 ;;; Agenda definition
46 ;;; =================
47
48 ;;; The agenda consists of:
49 ;;;  - a queue of immediate items to handle
50 ;;;  - sheduled future events to be added to a future queue
51 ;;;  - a tag by which running processes can escape for some asynchronous
52 ;;;    operation (from which they can be returned later)
53 ;;;  - a mapping of ports to various handler procedures
54 ;;;
55 ;;; The goal, eventually, is for this all to be immutable and functional.
56 ;;; However, we aren't there yet.  Some tricky things:
57 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
58 ;;;  - Need to use immutable queues (ijp's pfds library?)
59 ;;;  - Modeling reading from ports as something repeatable,
60 ;;;    and with reasonable separation from functional components?
61
62 (define-immutable-record-type <agenda>
63   (make-agenda-intern queue prompt-tag port-mapping schedule)
64   agenda?
65   (queue agenda-queue)
66   (prompt-tag agenda-prompt-tag)
67   (port-mapping agenda-port-mapping)
68   (schedule agenda-schedule))
69
70 (define (make-async-prompt-tag)
71   (make-prompt-tag "prompt"))
72
73 (define* (make-agenda #:key
74                       (queue (make-q))
75                       (prompt (make-prompt-tag))
76                       (port-mapping (make-port-mapping))
77                       (schedule (make-schedule)))
78   (make-agenda-intern queue prompt port-mapping schedule))
79
80
81 \f
82 ;;; Schedule
83 ;;; ========
84
85 ;;; This is where we handle timed events for the future
86
87 ;; This section totally borrows from SICP
88 ;; <3 <3 <3
89
90 ;; NOTE: time is a cons of (seconds . microseconds)
91
92 (define-record-type <time-segment>
93   (make-time-segment-intern time queue)
94   time-segment?
95   (time time-segment-time)
96   (queue time-segment-queue))
97
98 (define (time-segment-right-format time)
99   (match time
100     ;; time is already a cons of second and microsecnd
101     (((? integer? s) . (? integer? u)) time)
102     ;; time was just an integer (just the second)
103     ((? integer? _) (cons time 0))
104     (_ (throw 'invalid-time "Invalid time" time))))
105
106 (define* (make-time-segment time #:optional (queue (make-q)))
107   (make-time-segment-intern time queue))
108
109 (define (time-< time1 time2)
110   (cond ((< (car time1)
111             (car time2))
112          #t)
113         ((and (= (car time1)
114                  (car time2))
115               (< (cdr time1)
116                  (cdr time2)))
117          #t)
118         (else #f)))
119
120 (define (time-= time1 time2)
121   (and (= (car time1) (car time2))
122        (= (cdr time1) (cdr time2))))
123
124 (define (time-<= time1 time2)
125   (or (time-< time1 time2)
126       (time-= time1 time2)))
127
128 (define-record-type <schedule>
129   (make-schedule-intern segments)
130   schedule?
131   (segments schedule-segments set-schedule-segments!))
132
133 (define* (make-schedule #:optional segments)
134   (make-schedule-intern (or segments '())))
135
136 ;; TODO: This code is reasonably easy to read but it
137 ;;   mutates AND is worst case of O(n) in both space and time :(
138 ;;   but at least it'll be reasonably easy to refactor to
139 ;;   a more functional setup?
140 (define (schedule-add! time proc schedule)
141   (let ((time (time-segment-right-format time)))
142     (define (new-time-segment)
143       (let ((new-segment
144              (make-time-segment time)))
145         (enq! (time-segment-queue new-segment) proc)
146         new-segment))
147     (define (loop segments)
148       (define (segment-equals-time? segment)
149         (time-= time (time-segment-time segment)))
150
151       (define (segment-more-than-time? segment)
152         (time-< time (time-segment-time segment)))
153
154       ;; We could switch this out to be more mutate'y
155       ;; and avoid the O(n) of space... is that over-optimizing?
156       (match segments
157         ;; If we're at the end of the list, time to make a new
158         ;; segment...
159         ('() (cons (new-time-segment) '()))
160         ;; If the segment's time is exactly our time, good news
161         ;; everyone!  Let's append our stuff to its queue
162         (((? segment-equals-time? first) rest ...)
163          (enq! (time-segment-queue first) proc)
164          segments)
165         ;; If the first segment is more than our time,
166         ;; ours belongs before this one, so add it and
167         ;; start consing our way back
168         (((? segment-more-than-time? first) rest ...)
169          (cons (new-time-segment) segments))
170         ;; Otherwise, build up recursive result
171         ((first rest ... )
172          (cons first (loop rest)))))
173     (set-schedule-segments!
174      schedule
175      (loop (schedule-segments schedule)))))
176
177 (define (schedule-empty? schedule)
178   (eq? (schedule-segments schedule) '()))
179
180 (define (schedule-segments-split schedule time)
181   "Does a multiple value return of time segments before/at and after TIME"
182   (let ((time (time-segment-right-format time)))
183     (define (segment-is-now? segment)
184       (time-= (time-segment-time segment) time))
185     (define (segment-is-before-now? segment)
186       (time-< (time-segment-time segment) time))
187
188     (let loop ((segments-before '())
189                (segments-left (schedule-segments schedule)))
190       (match segments-left
191         ;; end of the line, return
192         ('()
193          (values (reverse segments-before) '()))
194
195         ;; It's right now, so time to stop, but include this one in before
196         ;; but otherwise return
197         (((? segment-is-now? first) rest ...)
198          (values (reverse (cons first segments-before)) rest))
199
200         ;; This is prior or at now, so add it and keep going
201         (((? segment-is-before-now? first) rest ...)
202          (loop (cons first segments-before) rest))
203
204         ;; Otherwise it's past now, just return what we have
205         (segments-after
206          (values segments-before segments-after))))))
207
208 (define (schedule-extract-until! schedule time)
209   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
210   (receive (segments-before segments-after)
211       (schedule-segments-split schedule time)
212     (set-schedule-segments! schedule segments-after)
213     segments-before))
214
215 (define (add-segments-contents-to-queue! segments queue)
216   (for-each
217    (lambda (segment)
218      (let ((seg-queue (time-segment-queue segment)))
219        (while (not (q-empty? seg-queue))
220          (enq! queue (deq! seg-queue)))))
221    segments))
222
223
224 \f
225 ;;; Port handling
226 ;;; =============
227
228 (define (make-port-mapping)
229   (make-hash-table))
230
231 (define* (port-mapping-set! port-mapping port #:optional read write except)
232   "Sets port-mapping for reader / writer / exception handlers"
233   (if (not (or read write except))
234       (throw 'no-handlers-given "No handlers given for port" port))
235   (hashq-set! port-mapping port
236               `#(,read ,write ,except)))
237
238 (define (port-mapping-remove! port-mapping port)
239   (hashq-remove! port-mapping port))
240
241 ;; TODO: This is O(n), I'm pretty sure :\
242 ;; ... it might be worthwhile for us to have a
243 ;;   port-mapping record that keeps a count of how many
244 ;;   handlers (maybe via a promise?)
245 (define (port-mapping-empty? port-mapping)
246   "Is this port mapping empty?"
247   (eq? (hash-count (const #t) port-mapping) 0))
248
249 (define (port-mapping-non-empty? port-mapping)
250   "Whether this port-mapping contains any elements"
251   (not (port-mapping-empty? port-mapping)))
252
253
254 \f
255 ;;; Request to run stuff
256 ;;; ====================
257
258 (define-record-type <run-request>
259   (make-run-request proc when)
260   run-request?
261   (proc run-request-proc)
262   (when run-request-when))
263
264 (define* (run proc #:optional when)
265   (make-run-request proc when))
266
267 (define-syntax-rule (wrap body ...)
268   (lambda ()
269     body ...))
270
271 (define-syntax-rule (run-wrap body ...)
272   (run (wrap body ...)))
273
274 (define-syntax-rule (run-wrap-at body ... when)
275   (run (wrap body ...) when))
276
277 \f
278 ;;; Execution of agenda, and current agenda
279 ;;; =======================================
280
281 (define %current-agenda (make-parameter #f))
282
283 (define* (start-agenda agenda #:optional stop-condition)
284   (let loop ((agenda agenda))
285     (let ((new-agenda   
286            ;; @@: Hm, maybe here would be a great place to handle
287            ;;   select'ing on ports.
288            ;;   We could compose over agenda-run-once and agenda-read-ports
289            (parameterize ((%current-agenda agenda))
290              (agenda-run-once agenda))))
291       (if (and stop-condition (stop-condition agenda))
292           'done
293           (loop new-agenda)))))
294
295 (define (agenda-run-once agenda)
296   "Run once through the agenda, and produce a new agenda
297 based on the results"
298   (define (call-proc proc)
299     (call-with-prompt
300         (agenda-prompt-tag agenda)
301       (lambda ()
302         (proc))
303       ;; TODO
304       (lambda (k) k)))
305
306   (let ((queue (agenda-queue agenda))
307         (next-queue (make-q)))
308     (while (not (q-empty? queue))
309       (let* ((proc (q-pop! queue))
310              (proc-result (call-proc proc))
311              (enqueue
312               (lambda (new-proc)
313                 (enq! next-queue new-proc))))
314         ;; @@: We might support delay-wrapped procedures here
315         (match proc-result
316           ;; TODO: replace procedure with something that indicates
317           ;;   intent to run.  Use a (run foo) procedure
318           ((? procedure? new-proc)
319            (enqueue new-proc))
320           (((? procedure? new-procs) ...)
321            (for-each
322             (lambda (new-proc)
323               (enqueue new-proc))
324             new-procs))
325           ;; do nothing
326           (_ #f))))
327     ;; TODO: Selecting on ports would happen here?
328     ;; Return new agenda, with next queue set
329     (set-field agenda (agenda-queue) next-queue)))