Add schedule stuff
[8sync.git] / loopy.scm
1 (define-module (loopy agenda)
2   #:use-module (srfi srfi-9)
3   #:use-module (srfi srfi-9 gnu)
4   #:use-module (ice-9 q)
5   #:use-module (ice-9 match)
6   #:export (make-agenda
7             agenda?
8             agenda-queue agenda-prompt-tag
9             agenda-port-pmapping agenda-schedule
10             
11             make-async-prompt-tag
12
13             make-time-segment
14             time-segment?
15             time-segment-time time-segment-queue
16
17             time-< time-= time-<=
18
19             make-schedule
20             schedule-add! schedule-empty?
21             schedule-segments
22
23             make-port-mapping
24             port-mapping-set! port-mapping-remove!
25             port-mapping-empty? port-mapping-non-empty?
26
27             %current-agenda
28             start-agenda agenda-run-once))
29
30 ;; @@: Using immutable agendas here, so wouldn't it make sense to
31 ;;   replace this queue stuff with using pfds based immutable queues?
32
33 \f
34 ;;; Agenda definition
35 ;;; =================
36
37 ;;; The agenda consists of:
38 ;;;  - a queue of immediate items to handle
39 ;;;  - sheduled future events to be added to a future queue
40 ;;;  - a tag by which running processes can escape for some asynchronous
41 ;;;    operation (from which they can be returned later)
42 ;;;  - a mapping of ports to various handler procedures
43 ;;;
44 ;;; The goal, eventually, is for this all to be immutable and functional.
45 ;;; However, we aren't there yet.  Some tricky things:
46 ;;;  - The schedule needs to be immutable, yet reasonably efficient.
47 ;;;  - Need to use immutable queues (ijp's pfds library?)
48 ;;;  - Modeling reading from ports as something repeatable,
49 ;;;    and with reasonable separation from functional components?
50
51 (define-immutable-record-type <agenda>
52   (make-agenda-intern queue prompt-tag port-mapping schedule)
53   agenda?
54   (queue agenda-queue)
55   (prompt-tag agenda-prompt-tag)
56   (port-mapping agenda-port-mapping)
57   (schedule agenda-schedule))
58
59 (define (make-async-prompt-tag)
60   (make-prompt-tag "prompt"))
61
62 (define* (make-agenda #:key
63                       (queue (make-q))
64                       (prompt (make-prompt-tag))
65                       (port-mapping (make-port-mapping))
66                       (schedule (make-schedule)))
67   (make-agenda-intern queue prompt port-mapping schedule))
68
69
70 \f
71 ;;; Schedule
72 ;;; ========
73
74 ;;; This is where we handle timed events for the future
75
76 ;; This section totally borrows from SICP
77 ;; <3 <3 <3
78
79 ;; NOTE: time is a cons of (seconds . microseconds)
80
81 (define-record-type <time-segment>
82   (make-time-segment-intern time queue)
83   time-segment?
84   (time time-segment-time)
85   (queue time-segment-queue))
86
87 (define (time-segment-right-format time)
88   (match time
89     ;; time is already a cons of second and microsecnd
90     (((? integer? s) . (? integer? u)) time)
91     ;; time was just an integer (just the second)
92     ((? integer? _) (cons time 0))
93     (_ (throw 'invalid-time "Invalid time" time))))
94
95 (define* (make-time-segment time #:optional (queue (make-q)))
96   (make-time-segment-intern time queue))
97
98 (define (time-< time1 time2)
99   (cond ((< (car time1)
100             (car time2))
101          #t)
102         ((and (= (car time1)
103                  (car time2))
104               (< (cdr time1)
105                  (cdr time2)))
106          #t)
107         (else #f)))
108
109 (define (time-= time1 time2)
110   (and (= (car time1) (car time2))
111        (= (cdr time1) (cdr time2))))
112
113 (define (time-<= time1 time2)
114   (or (time-< time1 time2)
115       (time-= time1 time2)))
116
117 (define-record-type <schedule>
118   (make-schedule-intern segments)
119   schedule?
120   (segments schedule-segments set-schedule-segments!))
121
122 (define* (make-schedule #:optional segments)
123   (make-schedule-intern (or segments '())))
124
125 ;; TODO: This code is reasonably easy to read but it
126 ;;   mutates AND is worst case of O(n) in both space and time :(
127 ;;   but at least it'll be reasonably easy to refactor to
128 ;;   a more functional setup?
129 (define (schedule-add! time proc schedule)
130   (let ((time (time-segment-right-format time)))
131     (define (new-time-segment)
132       (let ((new-segment
133              (make-time-segment time)))
134         (enq! (time-segment-queue new-segment) proc)
135         new-segment))
136     (define (loop segments)
137       (define (segment-equals-time? segment)
138         (time-= time (time-segment-time segment)))
139
140       (define (segment-more-than-time? segment)
141         (time-< time (time-segment-time segment)))
142
143       ;; We could switch this out to be more mutate'y
144       ;; and avoid the O(n) of space... is that over-optimizing?
145       (match segments
146         ;; If we're at the end of the list, time to make a new
147         ;; segment...
148         ('() (cons (make-time-segment time) '()))
149         ;; If the segment's time is exactly our time, good news
150         ;; everyone!  Let's append our stuff to its queue
151         (((? segment-equals-time? first) rest ...)
152          (enq! (time-segment-queue first) proc)
153          segments)
154         ;; If the first segment is more than our time,
155         ;; ours belongs before this one, so add it and
156         ;; start consing our way back
157         (((? segment-more-than-time? first) rest ...)
158          (cons (new-time-segment) segments))
159         ;; Otherwise, build up recursive result
160         ((first rest ... )
161          (cons first (loop rest)))))
162     (set-schedule-segments!
163      schedule
164      (loop (schedule-segments schedule)))))
165
166 (define (schedule-empty? schedule)
167   (eq? (schedule-segments schedule) '()))
168
169
170 \f
171 ;;; Port handling
172 ;;; =============
173
174 (define (make-port-mapping)
175   (make-hash-table))
176
177 (define* (port-mapping-set! port-mapping port #:optional read write except)
178   "Sets port-mapping for reader / writer / exception handlers"
179   (if (not (or read write except))
180       (throw 'no-handlers-given "No handlers given for port" port))
181   (hashq-set! port-mapping port
182               `#(,read ,write ,except)))
183
184 (define (port-mapping-remove! port-mapping port)
185   (hashq-remove! port-mapping port))
186
187 ;; TODO: This is O(n), I'm pretty sure :\
188 ;; ... it might be worthwhile for us to have a
189 ;;   port-mapping record that keeps a count of how many
190 ;;   handlers (maybe via a promise?)
191 (define (port-mapping-empty? port-mapping)
192   "Is this port mapping empty?"
193   (eq? (hash-count (const #t) port-mapping) 0))
194
195 (define (port-mapping-non-empty? port-mapping)
196   "Whether this port-mapping contains any elements"
197   (not (port-mapping-empty? port-mapping)))
198
199
200 \f
201 ;;; Execution of agenda, and current agenda
202 ;;; =======================================
203
204 (define %current-agenda (make-parameter #f))
205
206 (define* (start-agenda agenda #:optional stop-condition)
207   (let loop ((agenda agenda))
208     (let ((new-agenda   
209            ;; @@: Hm, maybe here would be a great place to handle
210            ;;   select'ing on ports.
211            ;;   We could compose over agenda-run-once and agenda-read-ports
212            (parameterize ((%current-agenda agenda))
213              (agenda-run-once agenda))))
214       (if (and stop-condition (stop-condition agenda))
215           'done
216           (loop new-agenda)))))
217
218 (define (agenda-run-once agenda)
219   "Run once through the agenda, and produce a new agenda
220 based on the results"
221   (define (call-proc proc)
222     (call-with-prompt
223         (agenda-prompt-tag agenda)
224       (lambda ()
225         (proc))
226       ;; TODO
227       (lambda (k) k)))
228
229   (let ((queue (agenda-queue agenda))
230         (next-queue (make-q)))
231     (while (not (q-empty? queue))
232       (let* ((proc (q-pop! queue))
233              (proc-result (call-proc proc))
234              (enqueue
235               (lambda (new-proc)
236                 (enq! next-queue new-proc))))
237         ;; @@: We might support delay-wrapped procedures here
238         (match proc-result
239           ((? procedure? new-proc)
240            (enqueue new-proc))
241           (((? procedure? new-procs) ...)
242            (for-each
243             (lambda (new-proc)
244               (enqueue new-proc))
245             new-procs))
246           ;; do nothing
247           (_ #f))))
248     ;; TODO: Selecting on ports would happen here?
249     ;; Return new agenda, with next queue set
250     (set-field agenda (agenda-queue) next-queue)))