fb170bc45a1af34c86d557240da60cb7d22925af
[8sync.git] / 8sync / systems / actors.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright (C) 2016 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 ;; XUDD inspired actor system
20
21 (define-module (8sync systems actors)
22   #:use-module (oop goops)
23   #:use-module (srfi srfi-9)
24   #:use-module (srfi srfi-9 gnu)
25   #:use-module (ice-9 format)
26   #:use-module (ice-9 match)
27   #:use-module (ice-9 pretty-print)
28   #:use-module (8sync agenda)
29   #:use-module (8sync repl)
30   #:export (;; utilities... ought to go in their own module
31             big-random-number
32             big-random-number-string
33             simple-message-id-generator
34             require-slot
35
36             <actor>
37             actor-id
38             actor-message-handler
39
40             ;;; Commenting out the <address> type for now;
41             ;;; it may be back when we have better serializers
42             ;; <address>
43             make-address address?
44             address-actor-id address-hive-id
45
46             address->string
47             actor-id-actor
48             actor-id-hive
49             actor-id-string
50
51             make-action-dispatch
52             define-simple-actor
53
54             <hive>
55             make-hive
56             ;; There are more methods for the hive, but there's
57             ;; no reason for the outside world to look at them maybe?
58             hive-id
59             hive-create-actor hive-create-actor*
60
61             create-actor create-actor*
62             self-destruct
63
64             <message>
65             make-message message?
66             message-to message-action message-from
67             message-id message-body message-in-reply-to
68             message-wants-reply
69             message-ref
70
71             send-message send-message-wait
72             reply-message reply-message-wait
73
74             <- <-wait <-reply <-reply-wait
75
76             ez-run-hive
77             bootstrap-message
78
79             serialize-message write-message
80             serialize-message-pretty pprint-message
81             read-message read-message-from-string))
82
83 ;; For ids
84 (define %random-state
85   (make-parameter (random-state-from-platform)))
86
87 ;; Same size as a uuid4 I think...
88 (define random-number-size (expt 2 128))
89
90 (define (big-random-number)
91   (random random-number-size (%random-state)))
92
93 ;; Would be great to get this base64 encoded instead.
94 (define (big-random-number-string)
95   ;; @@: This is slow.  Using format here is wasteful.
96   (format #f "~x" (big-random-number)))
97
98 ;; @@: This is slow.  A mere ~275k / second on my (old) machine.
99 ;;   The main cost seems to be in number->string.
100 (define (simple-message-id-generator)
101   ;; Prepending this cookie makes message ids unique per hive
102   (let ((prefix (format #f "~x:" (big-random-number)))
103         (counter 0))
104     (lambda ()
105       (set! counter (1+ counter))
106       (string-append prefix (number->string counter)))))
107
108 (define (require-slot slot-name)
109   "Generate something for #:init-thunk to complain about unfilled slot"
110   (lambda ()
111     (throw 'required-slot
112            (format #f "Slot ~s not filled" slot-name)
113            slot-name)))
114
115
116 \f
117 ;;; Messages
118 ;;; ========
119
120
121 (define-record-type <message>
122   (make-message-intern id to from action
123                        body in-reply-to wants-reply
124                        replied
125                        ;; @@: Not used yet.
126                        ;; Will we ever find a real use case?
127                        deferred-reply)
128   message?
129   (id message-id)
130   (to message-to)
131   (from message-from)
132   (action message-action)
133   (body message-body)
134   (in-reply-to message-in-reply-to)
135   (wants-reply message-wants-reply)
136
137   ;; See XUDD source for these.  Not use yet, maybe eventually will be?
138   ;; XUDD uses them for autoreply.
139   ;; Requiring mutation on message objects is clearly not great,
140   ;; but it may be worth it...?  Investigate!
141   (replied message-replied set-message-replied!)
142   (deferred-reply message-deferred-reply set-message-deferred-reply!))
143
144
145 (define* (make-message id to from action body
146                        #:key in-reply-to wants-reply
147                        replied deferred-reply)
148   (make-message-intern id to from action body
149                        in-reply-to wants-reply replied
150                        deferred-reply))
151
152 ;; Note: the body of messages is currently an alist, but it's created
153 ;;   from a keyword based property list (see the following two functions).
154 ;;   But, that's an extra conversion step, and maybe totally unnecessary:
155 ;;   we already have message-ref, and this could just pull a keyword
156 ;;   from a property list.
157 ;;   The main ways this might be useful are error checking,
158 ;;   serialization across the wire (but even that might require some
159 ;;   change), and using existing tooling (though adding new tooling
160 ;;   would be negligible in implementation effort.)
161
162 ;; This cons cell is immutable and unique (for eq? tests)
163 (define %nothing-provided (cons 'nothing 'provided))
164
165 (define* (message-ref message key #:optional (dflt %nothing-provided))
166   "Extract KEY from body of MESSAGE.
167
168 Optionally set default with [DFLT]
169 If key not found and DFLT not provided, throw an error."
170   (let ((result (assoc key (message-body message))))
171     (if result (cdr result)
172         (if (eq? dflt %nothing-provided)
173             (throw 'message-missing-key
174                    "Message body does not contain key and no default provided"
175                    #:key key
176                    #:message message)
177             dflt))))
178
179
180 (define (message-needs-reply message)
181   "See if this message needs a reply still"
182   (and (message-wants-reply message)
183        (not (or (message-replied message)
184                 (message-deferred-reply message)))))
185
186
187 (define (kwarg-list-to-alist args)
188   (let loop ((remaining args)
189              (result '()))
190     (match remaining
191       (((? keyword? key) val rest ...)
192        (loop rest
193              (cons (cons (keyword->symbol key) val) 
194                    result)))
195       (() result)
196       (_ (throw 'invalid-kwarg-list
197                 "Invalid keyword argument list"
198                 args)))))
199
200
201 (define (send-message from-actor to-id action . message-body-args)
202   "Send a message from an actor to another actor"
203   (let* ((hive (actor-hive from-actor))
204          (message (make-message (hive-gen-message-id hive) to-id
205                                 (actor-id from-actor) action
206                                 (kwarg-list-to-alist message-body-args))))
207     (8sync-nowait (hive-process-message hive message))))
208
209 (define (send-message-wait from-actor to-id action . message-body-args)
210   "Send a message from an actor to another, but wait until we get a response"
211   (let* ((hive (actor-hive from-actor))
212          (abort-to (hive-prompt (actor-hive from-actor)))
213          (message (make-message (hive-gen-message-id hive) to-id
214                                 (actor-id from-actor) action
215                                 (kwarg-list-to-alist message-body-args)
216                                 #:wants-reply #t)))
217     (abort-to-prompt abort-to from-actor message)))
218
219 ;; TODO: Intelligently ~propagate(ish) errors on -wait functions.
220 ;;   We might have `send-message-wait-brazen' to allow callers to
221 ;;   not have an exception thrown and instead just have a message with
222 ;;   the appropriate '*error* message returned.
223
224 (define (reply-message from-actor original-message
225                        . message-body-args)
226   "Reply to a message"
227   (set-message-replied! original-message #t)
228   (let* ((hive (actor-hive from-actor))
229          (new-message (make-message (hive-gen-message-id hive)
230                                     (message-from original-message)
231                                     (actor-id from-actor) '*reply*
232                                     (kwarg-list-to-alist message-body-args)
233                                     #:in-reply-to (message-id original-message))))
234     (8sync-nowait (hive-process-message hive new-message))))
235
236 (define (reply-message-wait from-actor original-message
237                             . message-body-args)
238   "Reply to a messsage, but wait until we get a response"
239   (set-message-replied! original-message #t)
240   (let* ((hive (actor-hive from-actor))
241          (abort-to (hive-prompt (actor-hive from-actor)))
242          (new-message (make-message (hive-gen-message-id hive)
243                                     (message-from original-message)
244                                     (actor-id from-actor) '*reply*
245                                     (kwarg-list-to-alist message-body-args)
246                                     #:wants-reply #t
247                                     #:in-reply-to (message-id original-message))))
248     (abort-to-prompt abort-to from-actor new-message)))
249
250
251 ;;; Aliases!
252 ;;; See: http://mumble.net/~jar/articles/oo-moon-weinreb.html
253 ;;;   (also worth seeing: http://mumble.net/~jar/articles/oo.html )
254
255 (define <- send-message)
256 (define <-wait send-message-wait)
257 (define <-reply reply-message)
258 (define <-reply-wait reply-message-wait)
259
260
261 \f
262 ;;; Main actor implementation
263 ;;; =========================
264
265 (define-class <actor> ()
266   ;; An address object
267   (id #:init-thunk (require-slot "id")
268       #:init-keyword #:id
269       #:getter actor-id)
270   ;; The hive we're connected to.
271   ;; We need this to be able to send messages.
272   (hive #:init-thunk (require-slot "hive")
273         #:init-keyword #:hive
274         #:accessor actor-hive)
275   ;; How we receive and process new messages
276   (message-handler #:init-thunk (require-slot "message-handler")
277                    #:allocation #:each-subclass))
278
279 (define-method (actor-message-handler (actor <actor>))
280   (slot-ref actor 'message-handler))
281
282 ;;; So these are the nicer representations of addresses.
283 ;;; However, they don't serialize so easily with scheme read/write, so we're
284 ;;; using the simpler cons cell version below for now.
285
286 ;; (define-record-type <address>
287 ;;   (make-address actor-id hive-id)  ; @@: Do we want the trailing -id?
288 ;;   address?
289 ;;   (actor-id address-actor-id)
290 ;;   (hive-id address-hive-id))
291 ;;
292 ;; (set-record-type-printer!
293 ;;  <address>
294 ;;  (lambda (record port)
295 ;;    (format port "<address: ~s@~s>"
296 ;;            (address-actor-id record) (address-hive-id record))))
297 ;;
298
299 (define (make-address actor-id hive-id)
300   (cons actor-id hive-id))
301
302 (define (address-actor-id address)
303   (car address))
304
305 (define (address-hive-id address)
306   (cdr address))
307
308 (define (address->string address)
309   (string-append (address-actor-id address) "@"
310                  (address-hive-id address)))
311
312 (define-method (actor-id-actor (actor <actor>))
313   "Get the actor id component of the actor-id"
314   (address-actor-id (actor-id actor)))
315
316 (define-method (actor-id-hive (actor <actor>))
317   "Get the hive id component of the actor-id"
318   (address-hive-id (actor-id actor)))
319
320 (define-method (actor-id-string (actor <actor>))
321   "Render the full actor id as a human-readable string"
322   (address->string (actor-id actor)))
323
324
325 \f
326 ;;; Actor utilities
327 ;;; ===============
328
329 (define (simple-dispatcher action-map)
330   (lambda (actor message)
331     (let* ((action (message-action message))
332            (method (assoc-ref action-map action)))
333       (if (not method)
334           (throw 'action-not-found
335                  "No appropriate action handler found for actor"
336                  #:action action
337                  #:actor actor
338                  #:message message
339                  #:available-actions (map car action-map)))
340       (method actor message))))
341
342 (define-syntax %expand-action-item
343   (syntax-rules ()
344     ((_ ((action-name action-args ...) body ...))
345      (cons (quote action-name)
346            (lambda (action-args ...)
347              body ...)))
348     ((_ (action-name handler))
349      (cons (quote action-name) handler))))
350
351 (define-syntax make-action-dispatch
352   (syntax-rules ()
353     "Expand a list of action names and actions into an alist
354
355 You can use this like the following:
356   (make-action-dispatch
357    (cookies
358     (lambda (actor message)
359       (display \"I love cookies!\n\")))
360    (party
361     (lambda (actor message)
362       (display \"Life of the party!\"))))
363
364 Alternately, if you'd like to skip the lambda, you could use the slightly
365 more compact following syntax:
366   (make-action-dispatch
367    ((cookies actor message)
368      (display \"I love cookies!\n\"))
369    ((party actor message)
370      (display \"Life of the party!\")))"
371     ((make-action-dispatch action-item ...)
372      (simple-dispatcher
373       (list (%expand-action-item action-item) ...)))))
374
375 (define-syntax-rule (define-simple-actor class actions ...)
376   (define-class class (<actor>)
377     (message-handler
378      #:init-value (make-action-dispatch actions ...)
379      #:allocation #:each-subclass)))
380
381 \f
382 ;;; The Hive
383 ;;; ========
384 ;;;   Every actor has a hive.  The hive is a kind of "meta-actor"
385 ;;;   which routes all the rest of the actors in a system.
386
387 (define-generic hive-handle-failed-forward)
388
389 (define-class <hive> (<actor>)
390   ;; This gets set to itself immediately after being created
391   (hive #:init-value #f)
392   (actor-registry #:init-thunk make-hash-table
393                   #:getter hive-actor-registry)
394   (msg-id-generator #:init-thunk simple-message-id-generator
395                     #:getter hive-msg-id-generator)
396   ;; Ambassadors are used (or will be) for inter-hive communication.
397   ;; These are special actors that know how to route messages to other hives.
398   (ambassadors #:init-thunk make-weak-key-hash-table
399                #:getter hive-ambassadors)
400   ;; Waiting coroutines
401   ;; This is a map from cons cell of message-id
402   ;;   to a cons cell of (actor-id . coroutine)
403   ;; @@: Should we have a <waiting-coroutine> record type?
404   ;; @@: Should there be any way to clear out "old" coroutines?
405   (waiting-coroutines #:init-thunk make-hash-table
406                       #:getter hive-waiting-coroutines)
407   ;; Message prompt
408   ;; When actors send messages to each other they abort to this prompt
409   ;; to send the message, then carry on their way
410   (prompt #:init-thunk make-prompt-tag
411           #:getter hive-prompt)
412   (message-handler
413    #:init-value
414    (make-action-dispatch
415     ;; This is in the case of an ambassador failing to forward a message...
416     ;; it reports it back to the hive
417     (*failed-forward* hive-handle-failed-forward))))
418
419 (define-method (hive-handle-failed-forward (hive <hive>) message)
420   "Handle an ambassador failing to forward a message"
421   'TODO)
422
423 (define* (make-hive #:key hive-id)
424   (let ((hive (make <hive>
425                 #:id (make-address
426                       "hive" (or hive-id
427                                  (big-random-number-string))))))
428     ;; Set the hive's actor reference to itself
429     (set! (actor-hive hive) hive)
430     hive))
431
432 (define-method (hive-id (hive <hive>))
433   (actor-id-hive hive))
434
435 (define-method (hive-gen-actor-id (hive <hive>) cookie)
436   (make-address (if cookie
437                     (string-append cookie "-" (big-random-number-string))
438                     (big-random-number-string))
439                 (hive-id hive)))
440
441 (define-method (hive-gen-message-id (hive <hive>))
442   "Generate a message id using HIVE's message id generator"
443   ((hive-msg-id-generator hive)))
444
445 (define-method (hive-resolve-local-actor (hive <hive>) actor-address)
446   (hash-ref (hive-actor-registry hive) actor-address))
447
448 (define-method (hive-resolve-ambassador (hive <hive>) ambassador-address)
449   (hash-ref (hive-ambassadors hive) ambassador-address))
450
451 (define-method (make-forward-request (hive <hive>) (ambassador <actor>) message)
452   (make-message (hive-gen-message-id hive) (actor-id ambassador)
453                 ;; If we make the hive not an actor, we could either switch this
454                 ;; to #f or to the original actor...?
455                 ;; Maybe some more thinking should be done on what should
456                 ;; happen in case of failure to forward?  Handling ambassador failures
457                 ;; seems like the primary motivation for the hive remaining an actor.
458                 (actor-id hive)
459                 '*forward*
460                 `((original . ,message))))
461
462 (define-method (hive-process-message (hive <hive>) message)
463   "Handle one message, or forward it via an ambassador"
464   (define (maybe-autoreply actor)
465     ;; Possibly autoreply
466     (if (message-needs-reply message)
467         ;; @@: Should we give *autoreply* as the action instead of *reply*?
468         (reply-message actor message
469                        #:*auto-reply* #t)))
470
471   (define (resolve-actor-to)
472     "Get the actor the message was aimed at"
473     (let ((actor (hive-resolve-local-actor hive (message-to message))))
474       (if (not actor)
475           (throw 'actor-not-found
476                  (format #f "Message ~a from ~a directed to nonexistant actor ~a"
477                          (message-id message)
478                          (address->string (message-from message))
479                          (address->string (message-to message)))
480                  message))
481       actor))
482
483   (define (call-catching-coroutine thunk)
484     (call-with-prompt (hive-prompt hive)
485       thunk
486       (lambda (kont actor message)
487         (let ((hive (actor-hive actor)))
488           ;; Register the coroutine
489           (hash-set! (hive-waiting-coroutines hive)
490                      (message-id message)
491                      (cons (actor-id actor) kont))
492           ;; Send off the message
493           (8sync (hive-process-message hive message))))))
494
495   (define (process-local-message)
496     (let ((actor (resolve-actor-to)))
497       (call-catching-coroutine
498        (lambda ()
499          (define message-handler (actor-message-handler actor))
500          ;; @@: Should a more general error handling happen here?
501          (let ((result
502                 (message-handler actor message)))
503            (maybe-autoreply actor)
504            ;; Returning result allows actors to possibly make a run-request
505            ;; at the end of handling a message.
506            ;; ... We do want that, right?
507            result)))))
508
509   (define (resume-waiting-coroutine)
510     (call-catching-coroutine
511      (lambda ()
512        (match (hash-remove! (hive-waiting-coroutines hive)
513                             (message-in-reply-to message))
514          ((_ . (resume-actor-id . kont))
515           (if (not (equal? (message-to message)
516                            resume-actor-id))
517               (throw 'resuming-to-wrong-actor
518                      "Attempted to resume a coroutine to the wrong actor!"
519                      #:expected-actor-id (message-to message)
520                      #:got-actor-id resume-actor-id
521                      #:message message))
522           (let (;; @@: How should we resolve resuming coroutines to actors who are
523                 ;;   now gone?
524                 (actor (resolve-actor-to))
525                 (result (kont message)))
526             (maybe-autoreply actor)
527             result))
528          (#f (throw 'no-waiting-coroutine
529                     "message in-reply-to tries to resume nonexistent coroutine"
530                     message))))))
531
532   (define (process-remote-message)
533     ;; Find the ambassador
534     (let* ((remote-hive-id (hive-id (message-to message)))
535            (ambassador (hive-resolve-ambassador remote-hive-id))
536            (message-handler (actor-message-handler ambassador))
537            (forward-request (make-forward-request hive ambassador message)))
538       (message-handler ambassador forward-request)))
539
540   (let ((to (message-to message)))
541     ;; This seems to be an easy mistake to make, so check that addressing
542     ;; is correct here
543     (if (not to)
544         (throw 'missing-addressee
545                "`to' field is missing on message"
546                #:message message))
547     (if (hive-actor-local? hive to)
548         (if (message-in-reply-to message)
549             (resume-waiting-coroutine)
550             (process-local-message))
551         (process-remote-message))))
552
553 (define-method (hive-actor-local? (hive <hive>) address)
554   (hash-ref (hive-actor-registry hive) address))
555
556 (define-method (hive-register-actor! (hive <hive>) (actor <actor>))
557   (hash-set! (hive-actor-registry hive) (actor-id actor) actor))
558
559 (define-method (%hive-create-actor (hive <hive>) actor-class
560                                    init id-cookie)
561   "Actual method called by hive-create-actor.
562
563 Since this is a define-method it can't accept fancy define* arguments,
564 so this gets called from the nicer hive-create-actor interface.  See
565 that method for documentation."
566   (let* ((actor-id (hive-gen-actor-id hive id-cookie))
567          (actor (apply make actor-class
568                        #:hive hive
569                        #:id actor-id
570                        init)))
571     (hive-register-actor! hive actor)
572     ;; return the actor id
573     actor-id))
574
575 (define* (hive-create-actor hive actor-class #:rest init)
576   (%hive-create-actor hive actor-class
577                       init #f))
578
579 (define* (hive-create-actor* hive actor-class id-cookie #:rest init)
580   (%hive-create-actor hive actor-class
581                       init id-cookie))
582
583
584 \f
585 ;;; Various API methods for actors to interact with the system
586 ;;; ==========================================================
587
588 ;; TODO: move send-message and friends here...?
589
590 (define* (create-actor from-actor actor-class #:rest init)
591   "Create an instance of actor-class.  Return the new actor's id.
592
593 This is the method actors should call directly (unless they want
594 to supply an id-cookie, in which case they should use
595 create-actor*)."
596   (8sync (%hive-create-actor (actor-hive from-actor) actor-class
597                              init #f)))
598
599
600 (define* (create-actor* from-actor actor-class id-cookie #:rest init)
601   "Create an instance of actor-class.  Return the new actor's id.
602
603 Like create-actor, but permits supplying an id-cookie."
604   (8sync (%hive-create-actor (actor-hive from-actor) actor-class
605                              init id-cookie)))
606
607
608 (define (self-destruct actor)
609   "Remove an actor from the hive."
610   (hash-remove! (hive-actor-registry (actor-hive actor))
611                 (actor-id actor)))
612
613
614 \f
615 ;;; 8sync bootstrap utilities
616 ;;; =========================
617
618 (define* (ez-run-hive hive initial-tasks #:key repl-server)
619   "Start up an agenda and run HIVE in it with INITIAL-TASKS.
620
621 Should we start up a cooperative REPL for live hacking?  REPL-SERVER
622 wants to know!  You can pass it #t or #f, or if you want to specify a port,
623 an integer."
624   (let* ((queue (list->q initial-tasks))
625          (agenda (make-agenda #:pre-unwind-handler print-error-and-continue
626                               #:queue queue)))
627     (cond
628      ;; If repl-server is an integer, we'll use that as the port
629      ((integer? repl-server)
630       (spawn-and-queue-repl-server! agenda repl-server))
631      (repl-server
632       (spawn-and-queue-repl-server! agenda)))
633     (start-agenda agenda)))
634
635 (define (bootstrap-message hive to-id action . message-body-args)
636   (wrap
637    (apply send-message hive to-id action message-body-args)))
638
639
640 \f
641 ;;; Basic readers / writers
642 ;;; =======================
643
644 (define (serialize-message message)
645   "Serialize a message for read/write"
646   (list
647    (message-id message)
648    (message-to message)
649    (message-from message)
650    (message-action message)
651    (message-body message)
652    (message-in-reply-to message)
653    (message-wants-reply message)
654    (message-replied message)
655    (message-deferred-reply message)))
656
657 (define* (write-message message #:optional (port (current-output-port)))
658   "Write out a message to a port for easy reading later.
659
660 Note that if a sub-value can't be easily written to something
661 Guile's `read' procedure knows how to read, this doesn't do anything
662 to improve that.  You'll need a better serializer for that.."
663   (write (serialize-message message) port))
664
665 (define (serialize-message-pretty message)
666   "Serialize a message in a way that's easy for humans to read."
667   `(*message*
668     (id ,(message-id message))
669     (to ,(message-to message))
670     (from ,(message-from message))
671     (action ,(message-action message))
672     (body ,(message-body message))
673     (in-reply-to ,(message-in-reply-to message))
674     (wants-reply ,(message-wants-reply message))
675     (replied ,(message-replied message))
676     (deferred-reply ,(message-deferred-reply message))))
677
678 (define (pprint-message message)
679   "Pretty print a message."
680   (pretty-print (serialize-message-pretty message)))
681
682 (define* (read-message #:optional (port (current-input-port)))
683   "Read a message serialized via serialize-message from PORT"
684   (match (read port)
685     ((id to from action body in-reply-to wants-reply replied deferred-reply)
686      (make-message-intern
687       id to from action body
688       in-reply-to wants-reply replied deferred-reply))
689     (anything-else
690      (throw 'message-read-bad-structure
691             "Could not read message from structure"
692             anything-else))))
693
694 (define (read-message-from-string message-str)
695   "Read message from MESSAGE-STR"
696   (with-input-from-string message-str
697     (lambda ()
698       (read-message (current-input-port)))))