Slightly better style
[8sync.git] / loopy.scm
1 (define-module (loopy agenda)
2   #:use-module (srfi srfi-9)
3   #:use-module (srfi srfi-9 gnu)
4   #:use-module (ice-9 q)
5   #:use-module (ice-9 match)
6   #:use-module (ice-9 receive)
7   #:export (<agenda>
8             make-agenda agenda?
9             agenda-queue agenda-prompt-tag
10             agenda-port-pmapping agenda-schedule
11             
12             make-async-prompt-tag
13
14             <time-segment>
15             make-time-segment time-segment?
16             time-segment-time time-segment-queue
17
18             time-< time-= time-<=
19
20             <schedule>
21             make-schedule schedule?
22             schedule-add! schedule-empty?
23             schedule-segments
24
25             schedule-segments-split schedule-extract-until!
26             add-segments-contents-to-queue!
27
28             make-port-mapping
29             port-mapping-set! port-mapping-remove!
30             port-mapping-empty? port-mapping-non-empty?
31
32             <run-request>
33             make-run-request run-request?
34             run-request-proc run-request-when
35
36             run wrap run-wrap run-wrap-at
37
38             %current-agenda
39             start-agenda agenda-run-once))
40
41 ;; @@: Using immutable agendas here, so wouldn't it make sense to
42 ;;   replace this queue stuff with using pfds based immutable queues?
43
44 \f
45 ;;; Agenda definition
46 ;;; =================
47
48 ;;; The agenda consists of:
49 ;;;  - a queue of immediate items to handle
50 ;;;  - sheduled future events to be added to a future queue
51 ;;;  - a tag by which running processes can escape for some asynchronous
52 ;;;    operation (from which they can be returned later)
53 ;;;  - a mapping of ports to various handler procedures
54 ;;;
55 ;;; The goal, eventually, is for this all to be immutable and functional.
56 ;;; However, we aren't there yet.  Some tricky things:
57 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
58 ;;;  - Need to use immutable queues (ijp's pfds library?)
59 ;;;  - Modeling reading from ports as something repeatable,
60 ;;;    and with reasonable separation from functional components?
61
62 (define-immutable-record-type <agenda>
63   (make-agenda-intern queue prompt-tag port-mapping schedule time)
64   agenda?
65   (queue agenda-queue)
66   (prompt-tag agenda-prompt-tag)
67   (port-mapping agenda-port-mapping)
68   (schedule agenda-schedule)
69   (time agenda-time))
70
71 (define (make-async-prompt-tag)
72   (make-prompt-tag "prompt"))
73
74 (define* (make-agenda #:key
75                       (queue (make-q))
76                       (prompt (make-prompt-tag))
77                       (port-mapping (make-port-mapping))
78                       (schedule (make-schedule))
79                       (time (gettimeofday)))
80   (make-agenda-intern queue prompt port-mapping schedule time))
81
82
83 \f
84 ;;; Schedule
85 ;;; ========
86
87 ;;; This is where we handle timed events for the future
88
89 ;; This section totally borrows from SICP
90 ;; <3 <3 <3
91
92 ;; NOTE: time is a cons of (seconds . microseconds)
93
94 (define-record-type <time-segment>
95   (make-time-segment-intern time queue)
96   time-segment?
97   (time time-segment-time)
98   (queue time-segment-queue))
99
100 (define (time-segment-right-format time)
101   (match time
102     ;; time is already a cons of second and microsecnd
103     (((? integer? s) . (? integer? u)) time)
104     ;; time was just an integer (just the second)
105     ((? integer? _) (cons time 0))
106     (_ (throw 'invalid-time "Invalid time" time))))
107
108 (define* (make-time-segment time #:optional (queue (make-q)))
109   (make-time-segment-intern time queue))
110
111 (define (time-< time1 time2)
112   (cond ((< (car time1)
113             (car time2))
114          #t)
115         ((and (= (car time1)
116                  (car time2))
117               (< (cdr time1)
118                  (cdr time2)))
119          #t)
120         (else #f)))
121
122 (define (time-= time1 time2)
123   (and (= (car time1) (car time2))
124        (= (cdr time1) (cdr time2))))
125
126 (define (time-<= time1 time2)
127   (or (time-< time1 time2)
128       (time-= time1 time2)))
129
130 (define-record-type <schedule>
131   (make-schedule-intern segments)
132   schedule?
133   (segments schedule-segments set-schedule-segments!))
134
135 (define* (make-schedule #:optional segments)
136   (make-schedule-intern (or segments '())))
137
138 ;; TODO: This code is reasonably easy to read but it
139 ;;   mutates AND is worst case of O(n) in both space and time :(
140 ;;   but at least it'll be reasonably easy to refactor to
141 ;;   a more functional setup?
142 (define (schedule-add! time proc schedule)
143   (let ((time (time-segment-right-format time)))
144     (define (new-time-segment)
145       (let ((new-segment
146              (make-time-segment time)))
147         (enq! (time-segment-queue new-segment) proc)
148         new-segment))
149     (define (loop segments)
150       (define (segment-equals-time? segment)
151         (time-= time (time-segment-time segment)))
152
153       (define (segment-more-than-time? segment)
154         (time-< time (time-segment-time segment)))
155
156       ;; We could switch this out to be more mutate'y
157       ;; and avoid the O(n) of space... is that over-optimizing?
158       (match segments
159         ;; If we're at the end of the list, time to make a new
160         ;; segment...
161         ('() (cons (new-time-segment) '()))
162         ;; If the segment's time is exactly our time, good news
163         ;; everyone!  Let's append our stuff to its queue
164         (((? segment-equals-time? first) rest ...)
165          (enq! (time-segment-queue first) proc)
166          segments)
167         ;; If the first segment is more than our time,
168         ;; ours belongs before this one, so add it and
169         ;; start consing our way back
170         (((? segment-more-than-time? first) rest ...)
171          (cons (new-time-segment) segments))
172         ;; Otherwise, build up recursive result
173         ((first rest ... )
174          (cons first (loop rest)))))
175     (set-schedule-segments!
176      schedule
177      (loop (schedule-segments schedule)))))
178
179 (define (schedule-empty? schedule)
180   (eq? (schedule-segments schedule) '()))
181
182 (define (schedule-segments-split schedule time)
183   "Does a multiple value return of time segments before/at and after TIME"
184   (let ((time (time-segment-right-format time)))
185     (define (segment-is-now? segment)
186       (time-= (time-segment-time segment) time))
187     (define (segment-is-before-now? segment)
188       (time-< (time-segment-time segment) time))
189
190     (let loop ((segments-before '())
191                (segments-left (schedule-segments schedule)))
192       (match segments-left
193         ;; end of the line, return
194         ('()
195          (values (reverse segments-before) '()))
196
197         ;; It's right now, so time to stop, but include this one in before
198         ;; but otherwise return
199         (((? segment-is-now? first) rest ...)
200          (values (reverse (cons first segments-before)) rest))
201
202         ;; This is prior or at now, so add it and keep going
203         (((? segment-is-before-now? first) rest ...)
204          (loop (cons first segments-before) rest))
205
206         ;; Otherwise it's past now, just return what we have
207         (segments-after
208          (values segments-before segments-after))))))
209
210 (define (schedule-extract-until! schedule time)
211   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
212   (receive (segments-before segments-after)
213       (schedule-segments-split schedule time)
214     (set-schedule-segments! schedule segments-after)
215     segments-before))
216
217 (define (add-segments-contents-to-queue! segments queue)
218   (for-each
219    (lambda (segment)
220      (let ((seg-queue (time-segment-queue segment)))
221        (while (not (q-empty? seg-queue))
222          (enq! queue (deq! seg-queue)))))
223    segments))
224
225
226 \f
227 ;;; Port handling
228 ;;; =============
229
230 (define (make-port-mapping)
231   (make-hash-table))
232
233 (define* (port-mapping-set! port-mapping port #:optional read write except)
234   "Sets port-mapping for reader / writer / exception handlers"
235   (if (not (or read write except))
236       (throw 'no-handlers-given "No handlers given for port" port))
237   (hashq-set! port-mapping port
238               `#(,read ,write ,except)))
239
240 (define (port-mapping-remove! port-mapping port)
241   (hashq-remove! port-mapping port))
242
243 ;; TODO: This is O(n), I'm pretty sure :\
244 ;; ... it might be worthwhile for us to have a
245 ;;   port-mapping record that keeps a count of how many
246 ;;   handlers (maybe via a promise?)
247 (define (port-mapping-empty? port-mapping)
248   "Is this port mapping empty?"
249   (eq? (hash-count (const #t) port-mapping) 0))
250
251 (define (port-mapping-non-empty? port-mapping)
252   "Whether this port-mapping contains any elements"
253   (not (port-mapping-empty? port-mapping)))
254
255
256 \f
257 ;;; Request to run stuff
258 ;;; ====================
259
260 (define-record-type <run-request>
261   (make-run-request proc when)
262   run-request?
263   (proc run-request-proc)
264   (when run-request-when))
265
266 (define* (run proc #:optional when)
267   (make-run-request proc when))
268
269 (define-syntax-rule (wrap body ...)
270   (lambda ()
271     body ...))
272
273 (define-syntax-rule (run-wrap body ...)
274   (run (wrap body ...)))
275
276 (define-syntax-rule (run-wrap-at body ... when)
277   (run (wrap body ...) when))
278
279 \f
280 ;;; Execution of agenda, and current agenda
281 ;;; =======================================
282
283 (define %current-agenda (make-parameter #f))
284
285 (define* (start-agenda agenda #:optional stop-condition)
286   (let loop ((agenda agenda))
287     (let ((new-agenda   
288            ;; @@: Hm, maybe here would be a great place to handle
289            ;;   select'ing on ports.
290            ;;   We could compose over agenda-run-once and agenda-read-ports
291            (parameterize ((%current-agenda agenda))
292              (agenda-run-once agenda))))
293       (if (and stop-condition (stop-condition agenda))
294           'done
295           (let ((updated-agenda
296                  ;; Adjust the agenda's time just in time
297                  ;; We do this here rather than in agenda-run-once to make
298                  ;; agenda-run-once's behavior fairly predictable
299                  (set-field new-agenda (agenda-time) (gettimeofday))))
300             (loop updated-agenda))))))
301
302 (define (agenda-run-once agenda)
303   "Run once through the agenda, and produce a new agenda
304 based on the results"
305   (define (call-proc proc)
306     (call-with-prompt
307         (agenda-prompt-tag agenda)
308       (lambda ()
309         (proc))
310       ;; TODO
311       (lambda (k) k)))
312
313   (let ((queue (agenda-queue agenda))
314         (next-queue (make-q)))
315     (while (not (q-empty? queue))
316       (let* ((proc (q-pop! queue))
317              (proc-result (call-proc proc))
318              (enqueue
319               (lambda (run-request)
320                 (cond
321                  ((run-request-when run-request)
322                   (error "TODO"))
323                  (else
324                   (enq! next-queue (run-request-proc run-request)))))))
325         ;; @@: We might support delay-wrapped procedures here
326         (match proc-result
327           ;; TODO: replace procedure with something that indicates
328           ;;   intent to run.  Use a (run foo) procedure
329           ((? run-request? new-proc)
330            (enqueue new-proc))
331           (((? run-request? new-procs) ...)
332            (for-each
333             (lambda (new-proc)
334               (enqueue new-proc))
335             new-procs))
336           ;; do nothing
337           (_ #f))))
338     ;; TODO: Selecting on ports would happen here?
339     ;; Return new agenda, with next queue set
340     (set-field agenda (agenda-queue) next-queue)))