Added add-segments-contents-to-queue!
[8sync.git] / loopy.scm
1 (define-module (loopy agenda)
2   #:use-module (srfi srfi-9)
3   #:use-module (srfi srfi-9 gnu)
4   #:use-module (ice-9 q)
5   #:use-module (ice-9 match)
6   #:use-module (ice-9 receive)
7   #:export (make-agenda
8             agenda?
9             agenda-queue agenda-prompt-tag
10             agenda-port-pmapping agenda-schedule
11             
12             make-async-prompt-tag
13
14             make-time-segment
15             time-segment?
16             time-segment-time time-segment-queue
17
18             time-< time-= time-<=
19
20             make-schedule
21             schedule-add! schedule-empty?
22             schedule-segments
23
24             schedule-segments-split schedule-extract-until!
25             add-segments-contents-to-queue!
26
27             make-port-mapping
28             port-mapping-set! port-mapping-remove!
29             port-mapping-empty? port-mapping-non-empty?
30
31             %current-agenda
32             start-agenda agenda-run-once))
33
34 ;; @@: Using immutable agendas here, so wouldn't it make sense to
35 ;;   replace this queue stuff with using pfds based immutable queues?
36
37 \f
38 ;;; Agenda definition
39 ;;; =================
40
41 ;;; The agenda consists of:
42 ;;;  - a queue of immediate items to handle
43 ;;;  - sheduled future events to be added to a future queue
44 ;;;  - a tag by which running processes can escape for some asynchronous
45 ;;;    operation (from which they can be returned later)
46 ;;;  - a mapping of ports to various handler procedures
47 ;;;
48 ;;; The goal, eventually, is for this all to be immutable and functional.
49 ;;; However, we aren't there yet.  Some tricky things:
50 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
51 ;;;  - Need to use immutable queues (ijp's pfds library?)
52 ;;;  - Modeling reading from ports as something repeatable,
53 ;;;    and with reasonable separation from functional components?
54
55 (define-immutable-record-type <agenda>
56   (make-agenda-intern queue prompt-tag port-mapping schedule)
57   agenda?
58   (queue agenda-queue)
59   (prompt-tag agenda-prompt-tag)
60   (port-mapping agenda-port-mapping)
61   (schedule agenda-schedule))
62
63 (define (make-async-prompt-tag)
64   (make-prompt-tag "prompt"))
65
66 (define* (make-agenda #:key
67                       (queue (make-q))
68                       (prompt (make-prompt-tag))
69                       (port-mapping (make-port-mapping))
70                       (schedule (make-schedule)))
71   (make-agenda-intern queue prompt port-mapping schedule))
72
73
74 \f
75 ;;; Schedule
76 ;;; ========
77
78 ;;; This is where we handle timed events for the future
79
80 ;; This section totally borrows from SICP
81 ;; <3 <3 <3
82
83 ;; NOTE: time is a cons of (seconds . microseconds)
84
85 (define-record-type <time-segment>
86   (make-time-segment-intern time queue)
87   time-segment?
88   (time time-segment-time)
89   (queue time-segment-queue))
90
91 (define (time-segment-right-format time)
92   (match time
93     ;; time is already a cons of second and microsecnd
94     (((? integer? s) . (? integer? u)) time)
95     ;; time was just an integer (just the second)
96     ((? integer? _) (cons time 0))
97     (_ (throw 'invalid-time "Invalid time" time))))
98
99 (define* (make-time-segment time #:optional (queue (make-q)))
100   (make-time-segment-intern time queue))
101
102 (define (time-< time1 time2)
103   (cond ((< (car time1)
104             (car time2))
105          #t)
106         ((and (= (car time1)
107                  (car time2))
108               (< (cdr time1)
109                  (cdr time2)))
110          #t)
111         (else #f)))
112
113 (define (time-= time1 time2)
114   (and (= (car time1) (car time2))
115        (= (cdr time1) (cdr time2))))
116
117 (define (time-<= time1 time2)
118   (or (time-< time1 time2)
119       (time-= time1 time2)))
120
121 (define-record-type <schedule>
122   (make-schedule-intern segments)
123   schedule?
124   (segments schedule-segments set-schedule-segments!))
125
126 (define* (make-schedule #:optional segments)
127   (make-schedule-intern (or segments '())))
128
129 ;; TODO: This code is reasonably easy to read but it
130 ;;   mutates AND is worst case of O(n) in both space and time :(
131 ;;   but at least it'll be reasonably easy to refactor to
132 ;;   a more functional setup?
133 (define (schedule-add! time proc schedule)
134   (let ((time (time-segment-right-format time)))
135     (define (new-time-segment)
136       (let ((new-segment
137              (make-time-segment time)))
138         (enq! (time-segment-queue new-segment) proc)
139         new-segment))
140     (define (loop segments)
141       (define (segment-equals-time? segment)
142         (time-= time (time-segment-time segment)))
143
144       (define (segment-more-than-time? segment)
145         (time-< time (time-segment-time segment)))
146
147       ;; We could switch this out to be more mutate'y
148       ;; and avoid the O(n) of space... is that over-optimizing?
149       (match segments
150         ;; If we're at the end of the list, time to make a new
151         ;; segment...
152         ('() (cons (new-time-segment) '()))
153         ;; If the segment's time is exactly our time, good news
154         ;; everyone!  Let's append our stuff to its queue
155         (((? segment-equals-time? first) rest ...)
156          (enq! (time-segment-queue first) proc)
157          segments)
158         ;; If the first segment is more than our time,
159         ;; ours belongs before this one, so add it and
160         ;; start consing our way back
161         (((? segment-more-than-time? first) rest ...)
162          (cons (new-time-segment) segments))
163         ;; Otherwise, build up recursive result
164         ((first rest ... )
165          (cons first (loop rest)))))
166     (set-schedule-segments!
167      schedule
168      (loop (schedule-segments schedule)))))
169
170 (define (schedule-empty? schedule)
171   (eq? (schedule-segments schedule) '()))
172
173 (define (schedule-segments-split schedule time)
174   "Does a multiple value return of time segments before/at and after TIME"
175   (let ((time (time-segment-right-format time)))
176     (define (segment-is-now? segment)
177       (time-= (time-segment-time segment) time))
178     (define (segment-is-before-now? segment)
179       (time-< (time-segment-time segment) time))
180
181     (let loop ((segments-before '())
182                (segments-left (schedule-segments schedule)))
183       (match segments-left
184         ;; end of the line, return
185         ('()
186          (values (reverse segments-before) '()))
187
188         ;; It's right now, so time to stop, but include this one in before
189         ;; but otherwise return
190         (((? segment-is-now? first) rest ...)
191          (values (reverse (cons first segments-before)) rest))
192
193         ;; This is prior or at now, so add it and keep going
194         (((? segment-is-before-now? first) rest ...)
195          (loop (cons first segments-before) rest))
196
197         ;; Otherwise it's past now, just return what we have
198         (segments-after
199          (values segments-before segments-after))))))
200
201 (define (schedule-extract-until! schedule time)
202   "Extract all segments until TIME from SCHEDULE, and pop old segments off"
203   (receive (segments-before segments-after)
204       (schedule-segments-split schedule time)
205     (set-schedule-segments! schedule segments-after)
206     segments-before))
207
208 (define (add-segments-contents-to-queue! segments queue)
209   (for-each
210    (lambda (segment)
211      (let ((seg-queue (time-segment-queue segment)))
212        (while (not (q-empty? seg-queue))
213          (enq! queue (deq! seg-queue)))))
214    segments))
215
216
217 \f
218 ;;; Port handling
219 ;;; =============
220
221 (define (make-port-mapping)
222   (make-hash-table))
223
224 (define* (port-mapping-set! port-mapping port #:optional read write except)
225   "Sets port-mapping for reader / writer / exception handlers"
226   (if (not (or read write except))
227       (throw 'no-handlers-given "No handlers given for port" port))
228   (hashq-set! port-mapping port
229               `#(,read ,write ,except)))
230
231 (define (port-mapping-remove! port-mapping port)
232   (hashq-remove! port-mapping port))
233
234 ;; TODO: This is O(n), I'm pretty sure :\
235 ;; ... it might be worthwhile for us to have a
236 ;;   port-mapping record that keeps a count of how many
237 ;;   handlers (maybe via a promise?)
238 (define (port-mapping-empty? port-mapping)
239   "Is this port mapping empty?"
240   (eq? (hash-count (const #t) port-mapping) 0))
241
242 (define (port-mapping-non-empty? port-mapping)
243   "Whether this port-mapping contains any elements"
244   (not (port-mapping-empty? port-mapping)))
245
246
247 \f
248 ;;; Execution of agenda, and current agenda
249 ;;; =======================================
250
251 (define %current-agenda (make-parameter #f))
252
253 (define* (start-agenda agenda #:optional stop-condition)
254   (let loop ((agenda agenda))
255     (let ((new-agenda   
256            ;; @@: Hm, maybe here would be a great place to handle
257            ;;   select'ing on ports.
258            ;;   We could compose over agenda-run-once and agenda-read-ports
259            (parameterize ((%current-agenda agenda))
260              (agenda-run-once agenda))))
261       (if (and stop-condition (stop-condition agenda))
262           'done
263           (loop new-agenda)))))
264
265 (define (agenda-run-once agenda)
266   "Run once through the agenda, and produce a new agenda
267 based on the results"
268   (define (call-proc proc)
269     (call-with-prompt
270         (agenda-prompt-tag agenda)
271       (lambda ()
272         (proc))
273       ;; TODO
274       (lambda (k) k)))
275
276   (let ((queue (agenda-queue agenda))
277         (next-queue (make-q)))
278     (while (not (q-empty? queue))
279       (let* ((proc (q-pop! queue))
280              (proc-result (call-proc proc))
281              (enqueue
282               (lambda (new-proc)
283                 (enq! next-queue new-proc))))
284         ;; @@: We might support delay-wrapped procedures here
285         (match proc-result
286           ((? procedure? new-proc)
287            (enqueue new-proc))
288           (((? procedure? new-procs) ...)
289            (for-each
290             (lambda (new-proc)
291               (enqueue new-proc))
292             new-procs))
293           ;; do nothing
294           (_ #f))))
295     ;; TODO: Selecting on ports would happen here?
296     ;; Return new agenda, with next queue set
297     (set-field agenda (agenda-queue) next-queue)))