6d17e0597d9bab86bdaf86b6f85bd34fbd7179a5
[8sync.git] / 8sync / actors.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright © 2016, 2017 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync actors)
20   #:use-module (oop goops)
21   #:use-module (srfi srfi-9)
22   #:use-module (ice-9 control)
23   #:use-module (ice-9 format)
24   #:use-module (ice-9 match)
25   #:use-module (ice-9 atomic)
26   #:use-module ((ice-9 ports internal)
27                 #:select (port-read-wait-fd port-write-wait-fd))
28   #:use-module (ice-9 pretty-print)
29   #:use-module (ice-9 receive)
30   #:use-module (ice-9 suspendable-ports)
31   #:use-module (fibers)
32   #:use-module (fibers channels)
33   #:use-module (fibers conditions)
34   #:use-module (fibers operations)
35   #:use-module (8sync inbox)
36   #:use-module (8sync rmeta-slot)
37
38   #:export (;; utilities... ought to go in their own module
39             big-random-number
40             big-random-number-string
41
42             <actor>
43             actor-id
44             actor-message-handler
45
46             *current-actor*
47
48             ;;; Commenting out the <address> type for now;
49             ;;; it may be back when we have better serializers
50             ;; <address>
51             make-address
52             address-actor-id address-hive-id
53
54             address->string
55             actor-id-actor
56             actor-id-hive
57             actor-id-string
58
59             actor-init! actor-cleanup!
60
61             build-actions
62
63             define-actor
64
65             actor-spawn-fiber
66             with-actor-nonblocking-ports
67
68             ;; <hive>
69             ;; make-hive
70             ;; ;; There are more methods for the hive, but there's
71             ;; ;; no reason for the outside world to look at them maybe?
72             ;; hive-id
73             create-actor create-actor*
74             self-destruct
75
76             <message>
77             make-message message?
78             message-to message-action message-from
79             message-id message-body message-in-reply-to
80             message-wants-reply
81
82             <- <-wait
83
84             spawn-hive run-hive
85
86             ;; Maybe the wrong place for this, or for it to be exported.
87             ;; But it's used in websockets' server implementation at least...
88             live-wrap))
89
90 ;; For ids
91 (set! *random-state* (random-state-from-platform))
92
93 ;; Same size as a uuid4 I think...
94 (define random-number-size (expt 2 128))
95
96 (define (big-random-number)
97   (random random-number-size))
98
99 ;; Would be great to get this base64 encoded instead.
100 (define (big-random-number-string)
101   ;; @@: This is slow.  Using format here is wasteful.
102   (format #f "~x" (big-random-number)))
103
104 ;; @@: This is slow-ish.  A mere ~275k / second on my (old) machine.
105 ;;   The main cost seems to be in number->string.
106 (define (simple-message-id-generator)
107   ;; Prepending this cookie makes message ids unique per hive
108   (let ((prefix (format #f "~x:" (big-random-number)))
109         (counter 0))
110     (lambda ()
111       (set! counter (1+ counter))
112       (string-append prefix (number->string counter)))))
113
114
115 \f
116 ;;; Messages
117 ;;; ========
118
119 (define-record-type <message>
120   (make-message-intern id to from action
121                        body in-reply-to wants-reply)
122   message?
123   ;; @@: message-ids are removed.  They could be re-enabled
124   ;;   if we had thread-safe promises...
125   (id message-id)                    ; id of this message
126   (to message-to)                    ; actor id this is going to
127   (from message-from)                ; actor id of sender
128   (action message-action)            ; action (a symbol) to be handled
129   (body message-body)                ; argument list "body" of message
130   (in-reply-to message-in-reply-to)  ; message id this is in reply to, if any
131   (wants-reply message-wants-reply)) ; whether caller is waiting for reply
132
133
134 (define* (make-message id to from action body
135                        #:key in-reply-to wants-reply)
136   (make-message-intern id to from action body
137                        in-reply-to wants-reply))
138
139 (define (kwarg-list-to-alist args)
140   (let loop ((remaining args)
141              (result '()))
142     (match remaining
143       (((? keyword? key) val rest ...)
144        (loop rest
145              (cons (cons (keyword->symbol key) val) 
146                    result)))
147       (() result)
148       (_ (throw 'invalid-kwarg-list
149                 "Invalid keyword argument list"
150                 args)))))
151
152
153 ;;; See: https://web.archive.org/web/20081223021934/http://mumble.net/~jar/articles/oo-moon-weinreb.html
154 ;;;   (also worth seeing: http://mumble.net/~jar/articles/oo.html )
155
156 ;; This is the internal, generalized message sending method.
157 ;; Users shouldn't use it!  Use the <-foo forms instead.
158
159 (define (%<- wants-reply from-actor to action args message-id in-reply-to)
160   ;; Okay, we need to deal with message ids.
161   ;; Could we get rid of them? :\
162   ;; It seems if we can use eq? and have messages be immutable then
163   ;; it should be possible to identify follow-up replies.
164   ;; If we need to track replies across hive boundaries we could
165   ;; register unique ids across the ambassador barrier.
166   (match to
167     (#(_ _ (? channel? channel) dead?)
168      (let ((message (make-message message-id to
169                                   (and from-actor (actor-id from-actor))
170                                   action args
171                                   #:wants-reply wants-reply
172                                   #:in-reply-to in-reply-to)))
173        (perform-operation
174         (choice-operation
175          (put-operation channel message)
176          (wait-operation dead?)))))
177     ;; TODO: put remote addresses here.
178     (#(actor-id hive-id #f #f)
179      ;; Here we'd make a call to our hive...
180      'TODO)
181     ;; A message sent to nobody goes nowhere.
182     ;; TODO: Should we display a warning here, probably?
183     (#f #f)
184     ;; We shouldn't technically be passing in actors but rather their
185     ;; addresses, but often actors want to message themselves and
186     ;; this makes that slightly easier.
187     ((? (lambda (x) (is-a? x <actor>)) actor)
188      (%<- wants-reply from-actor (actor-id actor) action
189           args message-id in-reply-to))))
190
191 (define (<- to action . args)
192   (define from-actor (*current-actor*))
193   (%<- #f from-actor to action args
194        (or (and from-actor
195                 ((actor-msg-id-generator from-actor)))
196            (big-random-number-string))
197        #f))
198
199 ;; TODO: this should abort to the prompt, then check for errors
200 ;;   when resuming.
201
202 (define (<-wait to action . args)
203   (define prompt (*actor-prompt*))
204   (when (not prompt)
205     (error "Tried to <-wait without being in an actor's context..."))
206
207   (let ((reply (abort-to-prompt prompt '<-wait to action args)))
208     (cond ((eq? action '*error*)
209            (throw 'hive-unresumable-coroutine
210                   "Won't resume coroutine; got an *error* as a reply"
211                   #:message reply))
212           (else (apply values (message-body reply))))))
213
214 \f
215 ;;; Main actor implementation
216 ;;; =========================
217
218 (define (actor-inheritable-message-handler actor message)
219   (define action (message-action message))
220   (define method
221     (class-rmeta-ref (class-of actor) 'actions action
222                      #:equals? eq? #:cache-set! hashq-set!
223                      #:cache-ref hashq-ref))
224   (unless method
225     (throw 'action-not-found
226            "No appropriate action handler found for actor"
227            #:action action
228            #:actor actor
229            #:message message))
230   (apply method actor message (message-body message)))
231
232 (define-syntax-rule (live-wrap body)
233   "Wrap possibly multi-value function in a procedure, applies all arguments"
234   (lambda args
235     (apply body args)))
236
237 (define-syntax-rule (build-actions (symbol method) ...)
238   "Construct an alist of (symbol . method), where the method is wrapped
239 with `live-wrap' to facilitate live hacking and allow the method definition
240 to come after class definition."
241   (build-rmeta-slot
242    (list (cons (quote symbol)
243                (live-wrap method)) ...)))
244
245 (define-class <actor> ()
246   ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?)
247   ;;  - inbox-channel is the receiving channel (as opposed to actor-inbox-deq)
248   ;;  - dead? is a fibers condition variable which is set once this actor
249   ;;    kicks the bucket
250   (id #:init-keyword #:address
251       #:getter actor-id)
252
253   ;; Our queue to send/receive messages on
254   (inbox-deq #:init-thunk make-channel
255              #:accessor actor-inbox-deq)
256
257   (msg-id-generator #:init-thunk simple-message-id-generator
258                     #:getter actor-msg-id-generator)
259
260   ;; How we receive and process new messages
261   (message-handler #:init-value actor-inheritable-message-handler
262                    ;; @@: There's no reason not to use #:class instead of
263                    ;;   #:each-subclass anywhere in this file, except for
264                    ;;   Guile bug #25211 (#:class is broken in Guile 2.2)
265                    #:allocation #:each-subclass
266                    #:getter actor-message-handler)
267
268   ;; valid values are:
269   ;;  - #t as in, send the init message, but don't wait (default)
270   ;;  - 'wait, as in wait on the init message
271   ;;  - #f as in don't bother to init
272   (should-init #:init-value #t
273                #:getter actor-should-init
274                #:allocation #:each-subclass)
275
276   ;; This is the default, "simple" way to inherit and process messages.
277   (actions #:init-thunk (build-actions)
278            #:allocation #:each-subclass))
279
280 ;;; Actors may specify an "init" action that occurs before the actor
281 ;;; actually begins to run.
282 ;;; During actor-init!, an actor may send a message to itself or others
283 ;;; via <- but *may not* use <-wait.
284 (define-method (actor-init! (actor <actor>))
285   'no-op)
286
287 (define-method (actor-cleanup! (actor <actor>))
288   'no-op)
289
290 ;;; Addresses are vectors where the first part is the actor-id and
291 ;;; the second part is the hive-id.  This works well enough... they
292 ;;; look decent being pretty-printed.
293
294 (define (make-address actor-id hive-id channel dead?)
295   (vector actor-id hive-id channel dead?))
296
297 (define (address-actor-id address)
298   (vector-ref address 0))
299
300 (define (address-hive-id address)
301   (vector-ref address 1))
302
303 (define (address-channel address)
304   (vector-ref address 2))
305
306 (define (address-dead? address)
307   (vector-ref address 3))
308
309 (define (address->string address)
310   (string-append (address-actor-id address) "@"
311                  (address-hive-id address)))
312
313 (define (address-equal? address1 address2)
314   "Check whether or not the two addresses are equal.
315
316 This compares the actor-id and hive-id but ignores the channel and
317 dead? condition."
318   (match address1
319     (#(actor-id-1 hive-id-1 _ _)
320      (match address2
321        (#(actor-id-2 hive-id-2)
322         (and (equal? actor-id-1 actor-id-2)
323              (and (equal? hive-id-1 hive-id-2))))
324        (_ #f)))
325     (_ #f)))
326
327 (define (actor-id-actor actor)
328   "Get the actor id component of the actor-id"
329   (address-actor-id (actor-id actor)))
330
331 (define (actor-id-hive actor)
332   "Get the hive id component of the actor-id"
333   (address-hive-id (actor-id actor)))
334
335 (define (actor-id-string actor)
336   "Render the full actor id as a human-readable string"
337   (address->string (actor-id actor)))
338
339 (define (actor-inbox-enq actor)
340   (address-channel (actor-id actor)))
341
342 (define *current-actor*
343   (make-parameter #f))
344
345 (define *actor-prompt*
346   (make-parameter #f))
347
348 (define *resume-io-channel*
349   (make-parameter #f))
350
351 (define (actor-main-loop actor)
352   "Main loop of the actor.  Loops around, pulling messages off its queue
353 and handling them."
354   ;; @@: Maybe establish some sort of garbage collection routine for these...
355   (define waiting
356     (make-hash-table))
357   (define message-handler
358     (actor-message-handler actor))
359   (define dead?
360     (address-dead? (actor-id actor)))
361   (define prompt (make-prompt-tag (actor-id-actor actor)))
362   ;; Not always used, only if with-actor-nonblocking-ports is used
363   (define resume-io-channel
364     (make-channel))
365
366   (define (handle-message message)
367     (catch #t
368       (lambda ()
369         (call-with-values
370             (lambda ()
371               (message-handler actor message))
372           (lambda vals
373             ;; Return reply if necessary
374             (when (message-wants-reply message)
375               (when (message-wants-reply message)
376                 (%<- #f actor (message-from message) '*reply*
377                      vals ((actor-msg-id-generator actor))
378                      (message-id message)))))))
379       (const #t)
380       (let ((err (current-error-port)))
381         (lambda (key . args)
382           (false-if-exception
383            (let ((stack (make-stack #t 4)))
384              (format err "Uncaught exception when handling message ~a:\n"
385                      message)
386              (display-backtrace stack err)
387              (print-exception err (stack-ref stack 0)
388                               key args)
389              (newline err)
390              ;; If the other actor is waiting on a reply, let's let them
391              ;; know there was an error...
392              (when (message-wants-reply message)
393                (%<- #f actor (message-from message) '*error*
394                     (list key) ((actor-msg-id-generator actor))
395                     (message-id message)))))))))
396   
397   (define (resume-handler message)
398     (define in-reply-to (message-in-reply-to message))
399     (cond
400      ((hash-ref waiting in-reply-to) =>
401       (lambda (kont)
402         (hash-remove! waiting in-reply-to)
403         (kont message)))
404      (else
405       (format (current-error-port)
406               "Tried to resume nonexistant message: ~a\n"
407               (message-id message)))))
408
409   (define (call-with-actor-prompt thunk)
410     (call-with-prompt prompt
411       thunk
412       ;; Here's where we abort to if we're doing <-wait
413       ;; @@: maybe use match-lambda if we're going to end up
414       ;;   handling multiple ~commands
415       (match-lambda*
416         ((kont '<-wait to action message-args)
417          (define message-id
418            ((actor-msg-id-generator actor)))
419          (hash-set! waiting message-id kont)
420          (%<- #t actor to action message-args message-id #f))
421         ((kont 'run-me proc)
422          (proc kont)))))
423
424   (define halt-or-handle-message
425     ;; It would be nice if we could give priorities to certain operations.
426     ;; halt should always win over getting a message...
427     (choice-operation
428      (wrap-operation (wait-operation dead?)
429                      (const #f))  ; halt and return
430      (wrap-operation (get-operation (actor-inbox-deq actor))
431                      (lambda (message)
432                        (call-with-actor-prompt
433                         (lambda ()
434                           (if (message-in-reply-to message)
435                               ;; resume a continuation which was waiting on a reply
436                               (resume-handler message)
437                               ;; start handling a new message
438                               (handle-message message))))
439                        #t))   ; loop again
440      (wrap-operation (get-operation resume-io-channel)
441                      (lambda (thunk)
442                        (call-with-actor-prompt
443                         (lambda ()
444                           (thunk)))
445                        #t))))
446
447   ;; Mutate the parameter; this should be fine since each fiber
448   ;; runs in its own dynamic state with with-dynamic-state.
449   ;; See with-dynamic-state discussion in
450   ;;   https://wingolog.org/archives/2017/06/27/growing-fibers
451   (*current-actor* actor)
452   (*resume-io-channel* resume-io-channel)
453
454   ;; We temporarily set the *actor-prompt* to #f to make sure that
455   ;; actor-init! doesn't try to do a <-wait message (and not accidentally use
456   ;; the parent fiber's *actor-prompt* either.)
457   (*actor-prompt* #f)
458   (actor-init! actor)
459   (*actor-prompt* prompt)
460
461   (let loop ()
462     (and (perform-operation halt-or-handle-message)
463          (loop))))
464
465
466 ;; @@: So in order for this to work, we're going to have to add
467 ;; another channel to actors, which is resumable i/o.  We'll have to
468 ;; spawn a fiber that wakes up a thunk on the actor when its port is
469 ;; available.  Funky...
470
471 (define (%suspend-io-to-actor wait-for-read/write)
472   (lambda (port)
473     (define prompt (*actor-prompt*))
474     (define resume-channel (*resume-io-channel*))
475     (define (run-at-prompt k)
476       (spawn-fiber
477        (lambda ()
478          (wait-for-read/write port)
479          ;; okay, we're awake again, tell the actor to resume this
480          ;; continuation
481          (put-message resume-channel k))
482        #:parallel? #f))
483     (when (not prompt)
484       (error "Attempt to abort to actor prompt outside of actor"))
485     (abort-to-prompt (*actor-prompt*)
486                      'run-me run-at-prompt)))
487
488 (define suspend-read-to-actor
489   (%suspend-io-to-actor (@@ (fibers) wait-for-readable)))
490
491 (define suspend-write-to-actor
492   (%suspend-io-to-actor (@@ (fibers) wait-for-writable)))
493
494 (define (with-actor-nonblocking-ports thunk)
495   "Runs THUNK in dynamic context in which attempting to read/write
496 from a port that would otherwise block an actor's correspondence with
497 other actors (note that reading from a nonblocking port should never
498 block other fibers) will instead permit reading other messages while
499 I/O is waiting to complete.
500
501 Note that currently "
502   (parameterize ((current-read-waiter suspend-read-to-actor)
503                  (current-write-waiter suspend-write-to-actor))
504     (thunk)))
505
506 (define (actor-spawn-fiber thunk . args)
507   "Spawn a fiber from an actor but unset actor-machinery-specific
508 dynamic context."
509   (apply spawn-fiber
510          (lambda ()
511            (*current-actor* #f)
512            (*resume-io-channel* #f)
513            (*actor-prompt* #f)
514            (thunk))
515          args))
516
517
518 \f
519 ;;; Actor utilities
520 ;;; ===============
521
522 (define-syntax-rule (define-actor class inherits
523                       (action ...)
524                       slots ...)
525   (define-class class inherits
526     (actions #:init-thunk (build-actions action ...)
527              #:allocation #:each-subclass)
528     slots ...))
529
530 \f
531 ;;; The Hive
532 ;;; ========
533 ;;;   Every actor has a hive, which keeps track of other actors, manages
534 ;;;   cleanup, and performs inter-hive communication.
535
536 ;; TODO: Make this a srfi-9 record type
537 (define-class <hive> ()
538   (id #:init-keyword #:id
539       #:getter hive-id)
540   (actor-registry #:init-thunk make-hash-table
541                   #:getter hive-actor-registry)
542   ;; TODO: Rename "ambassadors" to "relays"
543   ;; Ambassadors are used (or will be) for inter-hive communication.
544   ;; These are special actors that know how to route messages to other
545   ;; hives.
546   (ambassadors #:init-thunk make-weak-key-hash-table
547                #:getter hive-ambassadors)
548   (channel #:init-thunk make-channel
549            #:getter hive-channel)
550   (halt? #:init-thunk make-condition
551          #:getter hive-halt?))
552
553 (define* (make-hive #:key hive-id)
554   (make <hive> #:id (or hive-id
555                         (big-random-number-string))))
556
557 (define (gen-actor-id cookie)
558   (if cookie
559       (string-append cookie ":" (big-random-number-string))
560       (big-random-number-string)))
561
562 (define (hive-main-loop hive)
563   "The main loop of the hive.  This listens for messages on the hive-channel
564 for certain actions to perform.
565
566 `messages' here is not the same as a <message> object; these are a list of
567 values, the first value being a symbol"
568   (define channel (hive-channel hive))
569   (define halt? (hive-halt? hive))
570   (define registry (hive-actor-registry hive))
571
572   ;; not the same as a <message> ;P
573   (define handle-message
574     (match-lambda
575       (('register-actor actor-id address actor)
576        (hash-set! registry actor-id (vector address actor)))
577       ;; Remove the actor from hive
578       (('remove-actor actor-id)
579        (hash-remove! (hive-actor-registry hive) actor-id))
580       (('register-ambassador hive-id ambassador-actor-id)
581        'TODO)
582       (('unregister-ambassador hive-id ambassador-actor-id)
583        'TODO)
584       (('forward-message from-actor-id message)
585        'TODO)))
586
587   (define halt-or-handle
588     (choice-operation
589      (wrap-operation (get-operation channel)
590                      (lambda (msg)
591                        (handle-message msg)
592                        #t))
593      (wrap-operation (wait-operation halt?)
594                      (const #f))))
595
596   (let lp ()
597     (and (perform-operation halt-or-handle)
598          (lp))))
599
600 (define *hive-id* (make-parameter #f))
601 (define *hive-channel* (make-parameter #f))
602
603 ;; @@: Should we halt the hive either at the end of spawn-hive or run-hive?
604 (define* (spawn-hive proc #:key (hive (make-hive)))
605   "Spawn a hive and run PROC, passing it the fresh hive and establishing
606 a dynamic context surrounding the hive."
607   (spawn-fiber (lambda () (hive-main-loop hive)))
608   (parameterize ((*hive-id* (hive-id hive))
609                  (*hive-channel* (hive-channel hive)))
610     (proc hive)))
611
612 (define (run-hive proc . args)
613   "Spawn a hive and run it in run-fibers.  Takes a PROC as would be passed
614 to spawn-hive... all remaining arguments passed to run-fibers."
615   (apply run-fibers
616          (lambda ()
617            (spawn-hive proc))
618          args))
619
620 (define (%create-actor actor-class init-args id-cookie send-init?)
621   (let* ((hive-channel (*hive-channel*))
622          (hive-id (*hive-id*))
623          (actor-id (gen-actor-id id-cookie))
624          (dead? (make-condition))
625          (inbox-enq (make-channel))
626          (address (make-address actor-id hive-id
627                                 inbox-enq dead?))
628          (actor (apply make actor-class
629                        #:address address
630                        init-args))
631          (should-init (actor-should-init actor)))
632
633     ;; start the main loop
634     (spawn-fiber (lambda ()
635                    ;; start the inbox loop
636                    (spawn-fiber
637                     (lambda ()
638                       (delivery-agent inbox-enq (actor-inbox-deq actor)
639                                       dead?))
640                     ;; this one is decidedly non-parallel, because we want
641                     ;; the delivery agent to be in the same thread as its actor
642                     #:parallel? #f)
643
644                    (actor-main-loop actor))
645                  #:parallel? #t)
646
647     (put-message hive-channel (list 'register-actor actor-id address actor))
648     
649     ;; return the address
650     address))
651
652 (define* (create-actor actor-class #:rest init-args)
653   "Create an instance of actor-class.  Return the new actor's id.
654
655 This is the method actors should call directly (unless they want
656 to supply an id-cookie, in which case they should use
657 create-actor*)."
658   (%create-actor actor-class init-args #f #t))
659
660
661 (define* (create-actor* actor-class id-cookie #:rest init-args)
662   "Create an instance of actor-class.  Return the new actor's id.
663
664 Like create-actor, but permits supplying an id-cookie."
665   (%create-actor actor-class init-args id-cookie #t))
666
667 (define* (self-destruct actor #:key (cleanup #t))
668   "Remove an actor from the hive.
669
670 Unless #:cleanup is set to #f, this will first have the actor handle
671 its '*cleanup* action handler."
672   (signal-condition! (address-dead? (actor-id actor)))
673   (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor)))
674   ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending
675   ;; a message with <-wait
676   (*actor-prompt* #f)
677   (actor-cleanup! actor))
678