1 (define-module (loopy agenda)
2 #:use-module (srfi srfi-1)
3 #:use-module (srfi srfi-9)
4 #:use-module (srfi srfi-9 gnu)
6 #:use-module (ice-9 match)
7 #:use-module (ice-9 receive)
10 agenda-queue agenda-prompt-tag
11 agenda-read-port-map agenda-write-port-map agenda-except-port-map
17 make-time-segment time-segment?
18 time-segment-time time-segment-queue
20 time-< time-= time-<= time-+
23 make-time-delta tdelta time-delta?
24 time-delta-sec time-delta-usec
27 make-schedule schedule?
28 schedule-add! schedule-empty?
32 schedule-segments-split schedule-extract-until!
33 add-segments-contents-to-queue!
36 make-run-request run-request?
37 run-request-proc run-request-when
39 run wrap run-wrap run-wrap-at
42 start-agenda agenda-run-once))
44 ;; @@: Using immutable agendas here, so wouldn't it make sense to
45 ;; replace this queue stuff with using pfds based immutable queues?
51 ;;; The agenda consists of:
52 ;;; - a queue of immediate items to handle
53 ;;; - sheduled future events to be added to a future queue
54 ;;; - a tag by which running processes can escape for some asynchronous
55 ;;; operation (from which they can be returned later)
56 ;;; - a mapping of ports to various handler procedures
58 ;;; The goal, eventually, is for this all to be immutable and functional.
59 ;;; However, we aren't there yet. Some tricky things:
60 ;;; - The schedule needs to be immutable, yet reasonably efficient.
61 ;;; - Need to use immutable queues (ijp's pfds library?)
62 ;;; - Modeling reading from ports as something repeatable,
63 ;;; and with reasonable separation from functional components?
65 (define-immutable-record-type <agenda>
66 (make-agenda-intern queue prompt-tag
67 read-port-map write-port-map except-port-map
71 (prompt-tag agenda-prompt-tag)
72 (read-port-map agenda-read-port-map)
73 (write-port-map agenda-write-port-map)
74 (except-port-map agenda-except-port-map)
75 (schedule agenda-schedule)
78 (define (make-async-prompt-tag)
79 (make-prompt-tag "prompt"))
81 (define* (make-agenda #:key
83 (prompt (make-prompt-tag))
84 (read-port-map (make-hash-table))
85 (write-port-map (make-hash-table))
86 (except-port-map (make-hash-table))
87 (schedule (make-schedule))
88 (time (gettimeofday)))
89 (make-agenda-intern queue prompt
90 read-port-map write-port-map except-port-map
98 ;;; This is where we handle timed events for the future
100 ;; This section totally borrows from the ideas in SICP
103 ;; NOTE: time is a cons of (seconds . microseconds)
105 (define-record-type <time-segment>
106 (make-time-segment-intern time queue)
108 (time time-segment-time)
109 (queue time-segment-queue))
111 (define (time-segment-right-format time)
113 ;; time is already a cons of second and microsecnd
114 (((? integer? s) . (? integer? u)) time)
115 ;; time was just an integer (just the second)
116 ((? integer? _) (cons time 0))
117 (_ (throw 'invalid-time "Invalid time" time))))
119 (define* (make-time-segment time #:optional (queue (make-q)))
120 (make-time-segment-intern time queue))
122 (define (time-< time1 time2)
123 (cond ((< (car time1)
133 (define (time-= time1 time2)
134 (and (= (car time1) (car time2))
135 (= (cdr time1) (cdr time2))))
137 (define (time-<= time1 time2)
138 (or (time-< time1 time2)
139 (time-= time1 time2)))
142 (define-record-type <time-delta>
143 (make-time-delta-intern sec usec)
146 (usec time-delta-usec))
148 (define* (make-time-delta sec #:optional usec)
149 (make-time-delta-intern sec (or usec 0)))
151 (define tdelta make-time-delta)
153 (define (time-+ time time-delta)
154 (cons (+ (car time) (time-delta-sec time-delta))
155 (+ (cdr time) (time-delta-usec time-delta))))
158 (define-record-type <schedule>
159 (make-schedule-intern segments)
161 (segments schedule-segments set-schedule-segments!))
163 (define* (make-schedule #:optional segments)
164 (make-schedule-intern (or segments '())))
166 (define (schedule-soonest-time schedule)
167 "Return a cons of (sec . usec) for next time segement, or #f if none"
168 (let ((segments (schedule-segments schedule)))
169 (if (eq? segments '())
171 (time-segment-time (car segments)))))
173 ;; TODO: This code is reasonably easy to read but it
174 ;; mutates AND is worst case of O(n) in both space and time :(
175 ;; but at least it'll be reasonably easy to refactor to
176 ;; a more functional setup?
177 (define (schedule-add! schedule time proc)
178 (let ((time (time-segment-right-format time)))
179 (define (new-time-segment)
181 (make-time-segment time)))
182 (enq! (time-segment-queue new-segment) proc)
184 (define (loop segments)
185 (define (segment-equals-time? segment)
186 (time-= time (time-segment-time segment)))
188 (define (segment-more-than-time? segment)
189 (time-< time (time-segment-time segment)))
191 ;; We could switch this out to be more mutate'y
192 ;; and avoid the O(n) of space... is that over-optimizing?
194 ;; If we're at the end of the list, time to make a new
196 ('() (cons (new-time-segment) '()))
197 ;; If the segment's time is exactly our time, good news
198 ;; everyone! Let's append our stuff to its queue
199 (((? segment-equals-time? first) rest ...)
200 (enq! (time-segment-queue first) proc)
202 ;; If the first segment is more than our time,
203 ;; ours belongs before this one, so add it and
204 ;; start consing our way back
205 (((? segment-more-than-time? first) rest ...)
206 (cons (new-time-segment) segments))
207 ;; Otherwise, build up recursive result
209 (cons first (loop rest)))))
210 (set-schedule-segments!
212 (loop (schedule-segments schedule)))))
214 (define (schedule-empty? schedule)
215 (eq? (schedule-segments schedule) '()))
217 (define (schedule-segments-split schedule time)
218 "Does a multiple value return of time segments before/at and after TIME"
219 (let ((time (time-segment-right-format time)))
220 (define (segment-is-now? segment)
221 (time-= (time-segment-time segment) time))
222 (define (segment-is-before-now? segment)
223 (time-< (time-segment-time segment) time))
225 (let loop ((segments-before '())
226 (segments-left (schedule-segments schedule)))
228 ;; end of the line, return
230 (values (reverse segments-before) '()))
232 ;; It's right now, so time to stop, but include this one in before
233 ;; but otherwise return
234 (((? segment-is-now? first) rest ...)
235 (values (reverse (cons first segments-before)) rest))
237 ;; This is prior or at now, so add it and keep going
238 (((? segment-is-before-now? first) rest ...)
239 (loop (cons first segments-before) rest))
241 ;; Otherwise it's past now, just return what we have
243 (values segments-before segments-after))))))
245 (define (schedule-extract-until! schedule time)
246 "Extract all segments until TIME from SCHEDULE, and pop old segments off"
247 (receive (segments-before segments-after)
248 (schedule-segments-split schedule time)
249 (set-schedule-segments! schedule segments-after)
252 (define (add-segments-contents-to-queue! segments queue)
255 (let ((seg-queue (time-segment-queue segment)))
256 (while (not (q-empty? seg-queue))
257 (enq! queue (deq! seg-queue)))))
265 (define (make-port-mapping)
268 (define* (port-mapping-set! port-mapping port #:optional read write except)
269 "Sets port-mapping for reader / writer / exception handlers"
270 (if (not (or read write except))
271 (throw 'no-handlers-given "No handlers given for port" port))
272 (hashq-set! port-mapping port
273 `#(,read ,write ,except)))
275 (define (port-mapping-remove! port-mapping port)
276 (hashq-remove! port-mapping port))
278 ;; TODO: This is O(n), I'm pretty sure :\
279 ;; ... it might be worthwhile for us to have a
280 ;; port-mapping record that keeps a count of how many
281 ;; handlers (maybe via a promise?)
282 (define (port-mapping-empty? port-mapping)
283 "Is this port mapping empty?"
284 (eq? (hash-count (const #t) port-mapping) 0))
286 (define (port-mapping-non-empty? port-mapping)
287 "Whether this port-mapping contains any elements"
288 (not (port-mapping-empty? port-mapping)))
292 ;;; Request to run stuff
293 ;;; ====================
295 (define-record-type <run-request>
296 (make-run-request proc when)
298 (proc run-request-proc)
299 (when run-request-when))
301 (define* (run proc #:optional when)
302 (make-run-request proc when))
304 (define-syntax-rule (wrap body ...)
308 (define-syntax-rule (run-wrap body ...)
309 (run (wrap body ...)))
311 (define-syntax-rule (run-wrap-at body ... when)
312 (run (wrap body ...) when))
315 ;;; Execution of agenda, and current agenda
316 ;;; =======================================
318 (define %current-agenda (make-parameter #f))
320 (define (update-agenda-from-select! agenda)
321 (define (hash-keys hash)
322 (hash-map->list (lambda (k v) k) hash))
323 (define (get-wait-time)
324 ;; TODO: we need to figure this out based on whether there's anything
325 ;; in the queue, and if not, how long till the next scheduled item
326 (let ((soonest-time (schedule-soonest-time (agenda-schedule agenda))))
328 ((not (q-empty? (agenda-queue agenda)))
330 (soonest-time ; ie, the agenda is non-empty
331 (let* ((current-time (agenda-time agenda)))
332 (if (time-<= soonest-time current-time)
333 ;; Well there's something due so let's select
334 ;; (this avoids a (possible?) race condition chance)
337 (- (car soonest-time) (car current-time))
338 (- (cdr soonest-time) (cdr current-time))))))
342 ;; TODO: support usecond wait time too
345 (select (hash-keys (agenda-read-port-map agenda))
346 (hash-keys (agenda-write-port-map agenda))
347 (hash-keys (agenda-except-port-map agenda))
349 (define (get-procs-to-run)
350 (define (ports->procs ports port-map)
351 (lambda (initial-procs)
355 ((hash-ref port-map port) port))
360 ((read-ports write-ports except-ports)
361 ;; @@: Come on, we can do better than append ;P
362 ((compose (ports->procs
364 (agenda-read-port-map agenda))
367 (agenda-write-port-map agenda))
370 (agenda-except-port-map agenda)))
372 (define (update-agenda)
373 (let ((procs-to-run (get-procs-to-run))
374 (q (agenda-queue agenda)))
380 (define (ports-to-select?)
381 (define (has-items? selector)
383 ;; ... we could use hash-for-each and a continuation to jump
384 ;; out with a #t at first indication of an item
385 (not (= (hash-count (const #t)
388 (or (has-items? agenda-read-port-map)
389 (has-items? agenda-write-port-map)
390 (has-items? agenda-except-port-map)))
392 (if (ports-to-select?)
397 (define* (start-agenda agenda
399 (get-time gettimeofday)
400 (handle-ports update-agenda-from-select!))
401 (let loop ((agenda agenda))
403 ;; @@: Hm, maybe here would be a great place to handle
404 ;; select'ing on ports.
405 ;; We could compose over agenda-run-once and agenda-read-ports
406 (parameterize ((%current-agenda agenda))
407 (agenda-run-once agenda))))
408 (if (and stop-condition (stop-condition agenda))
410 (let* ((new-time (get-time))
413 ;; Adjust the agenda's time just in time
414 ;; We do this here rather than in agenda-run-once to make
415 ;; agenda-run-once's behavior fairly predictable
416 (set-field agenda (agenda-time) new-time))))
417 ;; Update the agenda's current queue based on
418 ;; currently applicable time segments
419 (add-segments-contents-to-queue!
420 (schedule-extract-until! (agenda-schedule agenda) new-time)
421 (agenda-queue agenda))
424 (define (agenda-run-once agenda)
425 "Run once through the agenda, and produce a new agenda
426 based on the results"
427 (define (call-proc proc)
429 (agenda-prompt-tag agenda)
435 (let ((queue (agenda-queue agenda))
436 (next-queue (make-q)))
437 (while (not (q-empty? queue))
438 (let* ((proc (q-pop! queue))
439 (proc-result (call-proc proc))
441 (lambda (run-request)
442 (define (schedule-at! time proc)
443 (schedule-add! (agenda-schedule agenda) time proc))
444 (let ((request-time (run-request-when run-request)))
446 ((? time-delta? time-delta)
447 (let ((time (time-+ (agenda-time agenda)
449 (schedule-at! time (run-request-proc run-request))))
451 (let ((time (cons sec 0)))
452 (schedule-at! time (run-request-proc run-request))))
453 (((? integer? sec) . (? integer? usec))
454 (schedule-at! request-time (run-request-proc run-request)))
456 (enq! next-queue (run-request-proc run-request))))))))
457 ;; @@: We might support delay-wrapped procedures here
459 ;; TODO: replace procedure with something that indicates
460 ;; intent to run. Use a (run foo) procedure
461 ((? run-request? new-proc)
463 (((? run-request? new-procs) ...)
470 ;; TODO: Alternately, we could return the next-queue
471 ;; along with changes to be added to the schedule here?
472 ;; Return new agenda, with next queue set
473 (set-field agenda (agenda-queue) next-queue)))