actors: Reflect removal of choice of whether to cleanup in self-destruct
[8sync.git] / 8sync / actors.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright © 2016, 2017 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync actors)
20   #:use-module (oop goops)
21   #:use-module (srfi srfi-9)
22   #:use-module (srfi srfi-9 gnu)
23   #:use-module (ice-9 control)
24   #:use-module (ice-9 format)
25   #:use-module (ice-9 match)
26   #:use-module (ice-9 atomic)
27   #:use-module ((ice-9 ports internal)
28                 #:select (port-read-wait-fd port-write-wait-fd))
29   #:use-module (ice-9 pretty-print)
30   #:use-module (ice-9 receive)
31   #:use-module (ice-9 suspendable-ports)
32   #:use-module (fibers)
33   #:use-module (fibers channels)
34   #:use-module (fibers conditions)
35   #:use-module (fibers operations)
36   #:use-module (8sync inbox)
37   #:use-module (8sync rmeta-slot)
38
39   #:export (;; utilities... ought to go in their own module
40             big-random-number
41             big-random-number-string
42
43             <actor>
44             actor-id
45             actor-message-handler
46
47             *current-actor*
48
49             ;;; Commenting out the <address> type for now;
50             ;;; it may be back when we have better serializers
51             ;; <address>
52             make-address
53             address-actor-id address-hive-id
54
55             address->string
56             actor-id-actor
57             actor-id-hive
58             actor-id-string
59
60             actor-init! actor-cleanup!
61
62             build-actions
63
64             define-actor
65
66             actor-spawn-fiber
67             with-actor-nonblocking-ports
68
69             ;; <hive>
70             ;; make-hive
71             ;; ;; There are more methods for the hive, but there's
72             ;; ;; no reason for the outside world to look at them maybe?
73             ;; hive-id
74             create-actor create-actor*
75             self-destruct
76
77             <message>
78             make-message message?
79             message-to message-action message-from
80             message-id message-body message-in-reply-to
81             message-wants-reply
82
83             <- <-wait
84
85             spawn-hive run-hive
86
87             ;; Maybe the wrong place for this, or for it to be exported.
88             ;; But it's used in websockets' server implementation at least...
89             live-wrap
90
91             *debug-actor-ids*))
92
93 ;; For ids
94 (set! *random-state* (random-state-from-platform))
95
96 ;; Same size as a uuid4 I think...
97 (define random-number-size (expt 2 128))
98
99 (define (big-random-number)
100   (random random-number-size))
101
102 ;; Would be great to get this base64 encoded instead.
103 (define (big-random-number-string)
104   ;; @@: This is slow.  Using format here is wasteful.
105   (format #f "~x" (big-random-number)))
106
107 ;; @@: This is slow-ish.  A mere ~275k / second on my (old) machine.
108 ;;   The main cost seems to be in number->string.
109 (define (simple-message-id-generator)
110   ;; Prepending this cookie makes message ids unique per hive
111   (let ((prefix (format #f "~x:" (big-random-number)))
112         (counter 0))
113     (lambda ()
114       (set! counter (1+ counter))
115       (string-append prefix (number->string counter)))))
116
117
118 \f
119 ;;; Messages
120 ;;; ========
121
122 (define-record-type <message>
123   (make-message-intern id to from action
124                        body in-reply-to wants-reply)
125   message?
126   ;; @@: message-ids are removed.  They could be re-enabled
127   ;;   if we had thread-safe promises...
128   (id message-id)                    ; id of this message
129   (to message-to)                    ; actor id this is going to
130   (from message-from)                ; actor id of sender
131   (action message-action)            ; action (a symbol) to be handled
132   (body message-body)                ; argument list "body" of message
133   (in-reply-to message-in-reply-to)  ; message id this is in reply to, if any
134   (wants-reply message-wants-reply)) ; whether caller is waiting for reply
135
136
137 (define* (make-message id to from action body
138                        #:key in-reply-to wants-reply)
139   (make-message-intern id to from action body
140                        in-reply-to wants-reply))
141
142 (define (kwarg-list-to-alist args)
143   (let loop ((remaining args)
144              (result '()))
145     (match remaining
146       (((? keyword? key) val rest ...)
147        (loop rest
148              (cons (cons (keyword->symbol key) val) 
149                    result)))
150       (() result)
151       (_ (throw 'invalid-kwarg-list
152                 "Invalid keyword argument list"
153                 args)))))
154
155
156 ;;; See: https://web.archive.org/web/20081223021934/http://mumble.net/~jar/articles/oo-moon-weinreb.html
157 ;;;   (also worth seeing: http://mumble.net/~jar/articles/oo.html )
158
159 ;; This is the internal, generalized message sending method.
160 ;; Users shouldn't use it!  Use the <-foo forms instead.
161
162 (define (%<- wants-reply from-actor to action args message-id in-reply-to)
163   ;; Okay, we need to deal with message ids.
164   ;; Could we get rid of them? :\
165   ;; It seems if we can use eq? and have messages be immutable then
166   ;; it should be possible to identify follow-up replies.
167   ;; If we need to track replies across hive boundaries we could
168   ;; register unique ids across the ambassador barrier.
169   (match to
170     (($ <address> _ _ (? channel? channel) dead?)
171      (let ((message (make-message message-id to
172                                   (and from-actor (actor-id from-actor))
173                                   action args
174                                   #:wants-reply wants-reply
175                                   #:in-reply-to in-reply-to)))
176        (perform-operation
177         (choice-operation
178          (put-operation channel message)
179          (wait-operation dead?)))))
180     ;; TODO: put remote addresses here.
181     (($ <address> actor-id hive-id #f #f)
182      ;; Here we'd make a call to our hive...
183      'TODO)
184     ;; A message sent to nobody goes nowhere.
185     ;; TODO: Should we display a warning here, probably?
186     (#f #f)
187     ;; We shouldn't technically be passing in actors but rather their
188     ;; addresses, but often actors want to message themselves and
189     ;; this makes that slightly easier.
190     ((? (lambda (x) (is-a? x <actor>)) actor)
191      (%<- wants-reply from-actor (actor-id actor) action
192           args message-id in-reply-to))))
193
194 (define (<- to action . args)
195   (define from-actor (*current-actor*))
196   (%<- #f from-actor to action args
197        (or (and from-actor
198                 ((actor-msg-id-generator from-actor)))
199            (big-random-number-string))
200        #f))
201
202 (define (<-wait to action . args)
203   (define prompt (*actor-prompt*))
204   (when (not prompt)
205     (error "Tried to <-wait without being in an actor's context..."))
206
207   (let ((reply (abort-to-prompt prompt '<-wait to action args)))
208     (cond ((eq? (message-action reply) '*error*)
209            (throw 'hive-unresumable-coroutine
210                   "Won't resume coroutine; got an *error* as a reply"
211                   #:message reply))
212           (else (apply values (message-body reply))))))
213
214 \f
215 ;;; Main actor implementation
216 ;;; =========================
217
218 (define (actor-inheritable-message-handler actor message)
219   (define action (message-action message))
220   (define method
221     (class-rmeta-ref (class-of actor) 'actions action
222                      #:equals? eq? #:cache-set! hashq-set!
223                      #:cache-ref hashq-ref))
224   (unless method
225     (throw 'action-not-found
226            "No appropriate action handler found for actor"
227            #:action action
228            #:actor actor
229            #:message message))
230   (apply method actor message (message-body message)))
231
232 (define-syntax-rule (live-wrap body)
233   "Wrap possibly multi-value function in a procedure, applies all arguments"
234   (lambda args
235     (apply body args)))
236
237 (define-syntax-rule (build-actions (symbol method) ...)
238   "Construct an alist of (symbol . method), where the method is wrapped
239 with `live-wrap' to facilitate live hacking and allow the method definition
240 to come after class definition."
241   (build-rmeta-slot
242    (list (cons (quote symbol)
243                (live-wrap method)) ...)))
244
245 (define-class <actor> ()
246   ;; An <address> object
247   (id #:init-keyword #:address
248       #:getter actor-id)
249
250   ;; Our queue to send/receive messages on
251   (inbox-deq #:init-thunk make-channel
252              #:accessor actor-inbox-deq)
253
254   (msg-id-generator #:init-thunk simple-message-id-generator
255                     #:getter actor-msg-id-generator)
256
257   ;; How we receive and process new messages
258   (message-handler #:init-value actor-inheritable-message-handler
259                    ;; @@: There's no reason not to use #:class instead of
260                    ;;   #:each-subclass anywhere in this file, except for
261                    ;;   Guile bug #25211 (#:class is broken in Guile 2.2)
262                    #:allocation #:each-subclass
263                    #:getter actor-message-handler)
264
265   ;; valid values are:
266   ;;  - #t as in, send the init message, but don't wait (default)
267   ;;  - 'wait, as in wait on the init message
268   ;;  - #f as in don't bother to init
269   (should-init #:init-value #t
270                #:getter actor-should-init
271                #:allocation #:each-subclass)
272
273   ;; This is the default, "simple" way to inherit and process messages.
274   (actions #:init-thunk (build-actions)
275            #:allocation #:each-subclass))
276
277 ;;; Actors may specify an "init" action that occurs before the actor
278 ;;; actually begins to run.
279 ;;; During actor-init!, an actor may send a message to itself or others
280 ;;; via <- but *may not* use <-wait.
281 (define-method (actor-init! (actor <actor>))
282   'no-op)
283
284 (define-method (actor-cleanup! (actor <actor>))
285   'no-op)
286
287 ;;; Every actor has an address, which is how its identified.
288 ;;; We also pack in some routing information.
289 (define-record-type <address>
290   (make-address actor-id hive-id channel dead?)
291   address?
292   ;; Unique-to-this-actor-on-this-hive part
293   (actor-id address-actor-id)
294   ;; Unique identifier for the hive we're connected to.
295   ;; If we don't have a "direct" link to the other actor through
296   ;; a channel, we'll have to look up if our hive has a way to route
297   ;; to the other hive.
298   (hive-id address-hive-id)
299   ;; The receiving channel (as opposed to actor-inbox-deq)
300   (channel address-channel)
301   ;; A fibers condition variable which is set once this actor kicks
302   ;; the bucket
303   (dead? address-dead?))
304
305 (set-record-type-printer!
306  <address>
307  (lambda (address port)
308    (format port "<address ~a ~a>"
309            (address->string address)
310            (if (address-channel address)
311                (string-append
312                 ":local"
313                 (if (atomic-box-ref
314                      ((@@ (fibers conditions) condition-signalled?)
315                       (address-dead? address)))
316                     " :dead" ""))
317                ":remote"))))
318
319 (define (address->string address)
320   (string-append (address-actor-id address) "@"
321                  (address-hive-id address)))
322
323 (define (address-equal? address1 address2)
324   "Check whether or not the two addresses are equal.
325
326 This compares the actor-id and hive-id but ignores the channel and
327 dead? condition."
328   (and (equal? (address-actor-id address1)
329                (address-actor-id address2))
330        (equal? (address-hive-id address1)
331                (address-hive-id address2))))
332
333 (define (actor-id-actor actor)
334   "Get the actor id component of the actor-id"
335   (address-actor-id (actor-id actor)))
336
337 (define (actor-id-hive actor)
338   "Get the hive id component of the actor-id"
339   (address-hive-id (actor-id actor)))
340
341 (define (actor-id-string actor)
342   "Render the full actor id as a human-readable string"
343   (address->string (actor-id actor)))
344
345 (define (actor-inbox-enq actor)
346   (address-channel (actor-id actor)))
347
348 (define *current-actor*
349   (make-parameter #f))
350
351 (define *actor-prompt*
352   (make-parameter #f))
353
354 (define *resume-io-channel*
355   (make-parameter #f))
356
357 (define (actor-main-loop actor)
358   "Main loop of the actor.  Loops around, pulling messages off its queue
359 and handling them."
360   ;; @@: Maybe establish some sort of garbage collection routine for these...
361   (define waiting
362     (make-hash-table))
363   (define message-handler
364     (actor-message-handler actor))
365   (define dead?
366     (address-dead? (actor-id actor)))
367   (define prompt (make-prompt-tag (actor-id-actor actor)))
368   ;; Not always used, only if with-actor-nonblocking-ports is used
369   (define resume-io-channel
370     (make-channel))
371
372   (define (handle-message message)
373     (catch #t
374       (lambda ()
375         (call-with-values
376             (lambda ()
377               (message-handler actor message))
378           (lambda vals
379             ;; Return reply if necessary
380             (when (message-wants-reply message)
381               (%<- #f actor (message-from message) '*reply*
382                    vals ((actor-msg-id-generator actor))
383                    (message-id message))))))
384       (const #t)
385       (let ((err (current-error-port)))
386         (lambda (key . args)
387           (false-if-exception
388            (let ((stack (make-stack #t 4)))
389              (format err "Uncaught exception when handling message ~a:\n"
390                      message)
391              (display-backtrace stack err)
392              (print-exception err (stack-ref stack 0)
393                               key args)
394              (newline err)
395              ;; If the other actor is waiting on a reply, let's let them
396              ;; know there was an error...
397              (when (message-wants-reply message)
398                (%<- #f actor (message-from message) '*error*
399                     (list key) ((actor-msg-id-generator actor))
400                     (message-id message)))))))))
401   
402   (define (resume-handler message)
403     (define in-reply-to (message-in-reply-to message))
404     (cond
405      ((hash-ref waiting in-reply-to) =>
406       (lambda (kont)
407         (hash-remove! waiting in-reply-to)
408         (kont message)))
409      (else
410       (format (current-error-port)
411               "Tried to resume nonexistant message: ~a\n"
412               (message-id message)))))
413
414   (define (call-with-actor-prompt thunk)
415     (call-with-prompt prompt
416       thunk
417       ;; Here's where we abort to if we're doing <-wait
418       ;; @@: maybe use match-lambda if we're going to end up
419       ;;   handling multiple ~commands
420       (match-lambda*
421         ((kont '<-wait to action message-args)
422          (define message-id
423            ((actor-msg-id-generator actor)))
424          (hash-set! waiting message-id kont)
425          (%<- #t actor to action message-args message-id #f))
426         ((kont 'run-me proc)
427          (proc kont)))))
428
429   (define halt-or-handle-message
430     ;; It would be nice if we could give priorities to certain operations.
431     ;; halt should always win over getting a message...
432     (choice-operation
433      (wrap-operation (wait-operation dead?)
434                      (const #f))  ; halt and return
435      (wrap-operation (get-operation (actor-inbox-deq actor))
436                      (lambda (message)
437                        (call-with-actor-prompt
438                         (lambda ()
439                           (if (message-in-reply-to message)
440                               ;; resume a continuation which was waiting on a reply
441                               (resume-handler message)
442                               ;; start handling a new message
443                               (handle-message message))))
444                        #t))   ; loop again
445      (wrap-operation (get-operation resume-io-channel)
446                      (lambda (thunk)
447                        (call-with-actor-prompt
448                         (lambda ()
449                           (thunk)))
450                        #t))))
451
452   ;; Mutate the parameter; this should be fine since each fiber
453   ;; runs in its own dynamic state with with-dynamic-state.
454   ;; See with-dynamic-state discussion in
455   ;;   https://wingolog.org/archives/2017/06/27/growing-fibers
456   (*current-actor* actor)
457   (*resume-io-channel* resume-io-channel)
458
459   ;; We temporarily set the *actor-prompt* to #f to make sure that
460   ;; actor-init! doesn't try to do a <-wait message (and not accidentally use
461   ;; the parent fiber's *actor-prompt* either.)
462   (*actor-prompt* #f)
463   (actor-init! actor)
464   (*actor-prompt* prompt)
465
466   (let loop ()
467     (and (perform-operation halt-or-handle-message)
468          (loop))))
469
470
471 ;; @@: So in order for this to work, we're going to have to add
472 ;; another channel to actors, which is resumable i/o.  We'll have to
473 ;; spawn a fiber that wakes up a thunk on the actor when its port is
474 ;; available.  Funky...
475
476 (define (%suspend-io-to-actor wait-for-read/write)
477   (lambda (port)
478     (define prompt (*actor-prompt*))
479     (define resume-channel (*resume-io-channel*))
480     (define (run-at-prompt k)
481       (spawn-fiber
482        (lambda ()
483          (wait-for-read/write port)
484          ;; okay, we're awake again, tell the actor to resume this
485          ;; continuation
486          (put-message resume-channel k))
487        #:parallel? #f))
488     (when (not prompt)
489       (error "Attempt to abort to actor prompt outside of actor"))
490     (abort-to-prompt (*actor-prompt*)
491                      'run-me run-at-prompt)))
492
493 (define suspend-read-to-actor
494   (%suspend-io-to-actor (@@ (fibers) wait-for-readable)))
495
496 (define suspend-write-to-actor
497   (%suspend-io-to-actor (@@ (fibers) wait-for-writable)))
498
499 (define (with-actor-nonblocking-ports thunk)
500   "Runs THUNK in dynamic context in which attempting to read/write
501 from a port that would otherwise block an actor's correspondence with
502 other actors (note that reading from a nonblocking port should never
503 block other fibers) will instead permit reading other messages while
504 I/O is waiting to complete.
505
506 Note that currently "
507   (parameterize ((current-read-waiter suspend-read-to-actor)
508                  (current-write-waiter suspend-write-to-actor))
509     (thunk)))
510
511 (define (actor-spawn-fiber thunk . args)
512   "Spawn a fiber from an actor but unset actor-machinery-specific
513 dynamic context."
514   (apply spawn-fiber
515          (lambda ()
516            (*current-actor* #f)
517            (*resume-io-channel* #f)
518            (*actor-prompt* #f)
519            (thunk))
520          args))
521
522
523 \f
524 ;;; Actor utilities
525 ;;; ===============
526
527 (define-syntax-rule (define-actor class inherits
528                       (action ...)
529                       slots ...)
530   (define-class class inherits
531     (actions #:init-thunk (build-actions action ...)
532              #:allocation #:each-subclass)
533     slots ...))
534
535 \f
536 ;;; The Hive
537 ;;; ========
538 ;;;   Every actor has a hive, which keeps track of other actors, manages
539 ;;;   cleanup, and performs inter-hive communication.
540
541 ;; TODO: Make this a srfi-9 record type
542 (define-class <hive> ()
543   (id #:init-keyword #:id
544       #:getter hive-id)
545   (actor-registry #:init-thunk make-hash-table
546                   #:getter hive-actor-registry)
547   ;; TODO: Rename "ambassadors" to "relays"
548   ;; Ambassadors are used (or will be) for inter-hive communication.
549   ;; These are special actors that know how to route messages to other
550   ;; hives.
551   (ambassadors #:init-thunk make-weak-key-hash-table
552                #:getter hive-ambassadors)
553   (channel #:init-thunk make-channel
554            #:getter hive-channel)
555   (halt? #:init-thunk make-condition
556          #:getter hive-halt?))
557
558 (define* (make-hive #:key hive-id)
559   (make <hive> #:id (or hive-id
560                         (big-random-number-string))))
561
562 (define (gen-actor-id cookie)
563   (if cookie
564       (string-append cookie ":" (big-random-number-string))
565       (big-random-number-string)))
566
567 (define (hive-main-loop hive)
568   "The main loop of the hive.  This listens for messages on the hive-channel
569 for certain actions to perform.
570
571 `messages' here is not the same as a <message> object; these are a list of
572 values, the first value being a symbol"
573   (define channel (hive-channel hive))
574   (define halt? (hive-halt? hive))
575   (define registry (hive-actor-registry hive))
576
577   ;; not the same as a <message> ;P
578   (define handle-message
579     (match-lambda
580       (('register-actor actor-id address actor)
581        (hash-set! registry actor-id (vector address actor)))
582       ;; Remove the actor from hive
583       (('remove-actor actor-id)
584        (hash-remove! (hive-actor-registry hive) actor-id))
585       (('register-ambassador hive-id ambassador-actor-id)
586        'TODO)
587       (('unregister-ambassador hive-id ambassador-actor-id)
588        'TODO)
589       (('forward-message from-actor-id message)
590        'TODO)))
591
592   (define halt-or-handle
593     (choice-operation
594      (wrap-operation (get-operation channel)
595                      (lambda (msg)
596                        (handle-message msg)
597                        #t))
598      (wrap-operation (wait-operation halt?)
599                      (const #f))))
600
601   (let lp ()
602     (and (perform-operation halt-or-handle)
603          (lp))))
604
605 (define *hive-id* (make-parameter #f))
606 (define *hive-channel* (make-parameter #f))
607
608 ;; @@: Should we halt the hive either at the end of spawn-hive or run-hive?
609 (define* (spawn-hive proc #:key (hive (make-hive)))
610   "Spawn a hive and run PROC, passing it the fresh hive and establishing
611 a dynamic context surrounding the hive."
612   (spawn-fiber (lambda () (hive-main-loop hive)))
613   (parameterize ((*hive-id* (hive-id hive))
614                  (*hive-channel* (hive-channel hive)))
615     (proc hive)))
616
617 (define (run-hive proc . args)
618   "Spawn a hive and run it in run-fibers.  Takes a PROC as would be passed
619 to spawn-hive... all remaining arguments passed to run-fibers."
620   (apply run-fibers
621          (lambda ()
622            (spawn-hive proc))
623          args))
624
625 (define (%create-actor actor-class init-args id-cookie send-init?)
626   (let* ((hive-channel (*hive-channel*))
627          (hive-id (*hive-id*))
628          (actor-id (gen-actor-id id-cookie))
629          (dead? (make-condition))
630          (inbox-enq (make-channel))
631          (address (make-address actor-id hive-id
632                                 inbox-enq dead?))
633          (actor (apply make actor-class
634                        #:address address
635                        init-args))
636          (should-init (actor-should-init actor)))
637
638     ;; start the main loop
639     (spawn-fiber (lambda ()
640                    ;; start the inbox loop
641                    (spawn-fiber
642                     (lambda ()
643                       (delivery-agent inbox-enq (actor-inbox-deq actor)
644                                       dead?))
645                     ;; this one is decidedly non-parallel, because we want
646                     ;; the delivery agent to be in the same thread as its actor
647                     #:parallel? #f)
648
649                    (actor-main-loop actor))
650                  #:parallel? #t)
651
652     (put-message hive-channel (list 'register-actor actor-id address actor))
653     
654     ;; return the address
655     address))
656
657 ;;; Whether or not to attach the class' name as a cookie by default in
658 ;;; create-actor
659 (define *debug-actor-ids*
660   (make-parameter #t))
661
662 (define* (create-actor actor-class #:rest init-args)
663   "Create an instance of actor-class.  Return the new actor's id.
664
665 This is the method actors should call directly (unless they want
666 to supply an id-cookie, in which case they should use
667 create-actor*)."
668   (%create-actor actor-class init-args
669                  (if (*debug-actor-ids*)
670                      (symbol->string (class-name actor-class))
671                      #f)
672                  #t))
673
674 (define* (create-actor* actor-class id-cookie #:rest init-args)
675   "Create an instance of actor-class.  Return the new actor's id.
676
677 Like create-actor, but permits supplying an id-cookie."
678   (%create-actor actor-class init-args id-cookie #t))
679
680 (define (self-destruct actor)
681   "Remove an actor from the hive.
682
683 The actor will also call its `actor-cleanup!' method."
684   (signal-condition! (address-dead? (actor-id actor)))
685   (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor)))
686   ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending
687   ;; a message with <-wait
688   (*actor-prompt* #f)
689   (actor-cleanup! actor))
690