Update reference to hive to be dynamic state, simplify create-actor.
[8sync.git] / 8sync / actors.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright © 2016, 2017 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync actors)
20   #:use-module (oop goops)
21   #:use-module (srfi srfi-9)
22   #:use-module (ice-9 control)
23   #:use-module (ice-9 format)
24   #:use-module (ice-9 match)
25   #:use-module (ice-9 atomic)
26   #:use-module ((ice-9 ports internal)
27                 #:select (port-read-wait-fd port-write-wait-fd))
28   #:use-module (ice-9 pretty-print)
29   #:use-module (ice-9 receive)
30   #:use-module (ice-9 suspendable-ports)
31   #:use-module (fibers)
32   #:use-module (fibers channels)
33   #:use-module (fibers conditions)
34   #:use-module (fibers operations)
35   #:use-module (fibers internal)
36   #:use-module (8sync inbox)
37   #:use-module (8sync rmeta-slot)
38
39   #:export (;; utilities... ought to go in their own module
40             big-random-number
41             big-random-number-string
42
43             <actor>
44             actor-id
45             actor-message-handler
46
47             ;;; Commenting out the <address> type for now;
48             ;;; it may be back when we have better serializers
49             ;; <address>
50             make-address
51             address-actor-id address-hive-id
52
53             address->string
54             actor-id-actor
55             actor-id-hive
56             actor-id-string
57
58             actor-init! actor-cleanup!
59
60             actor-alive?
61
62             build-actions
63
64             define-actor
65
66             actor-spawn-fiber
67             with-actor-nonblocking-ports
68
69             ;; <hive>
70             ;; make-hive
71             ;; ;; There are more methods for the hive, but there's
72             ;; ;; no reason for the outside world to look at them maybe?
73             ;; hive-id
74             create-actor create-actor*
75             self-destruct
76
77             <message>
78             make-message message?
79             message-to message-action message-from
80             message-id message-body message-in-reply-to
81             message-wants-reply
82
83             <- <-wait
84
85             spawn-hive run-hive))
86
87 ;; For ids
88 (set! *random-state* (random-state-from-platform))
89
90 ;; Same size as a uuid4 I think...
91 (define random-number-size (expt 2 128))
92
93 (define (big-random-number)
94   (random random-number-size))
95
96 ;; Would be great to get this base64 encoded instead.
97 (define (big-random-number-string)
98   ;; @@: This is slow.  Using format here is wasteful.
99   (format #f "~x" (big-random-number)))
100
101 ;; @@: This is slow-ish.  A mere ~275k / second on my (old) machine.
102 ;;   The main cost seems to be in number->string.
103 (define (simple-message-id-generator)
104   ;; Prepending this cookie makes message ids unique per hive
105   (let ((prefix (format #f "~x:" (big-random-number)))
106         (counter 0))
107     (lambda ()
108       (set! counter (1+ counter))
109       (string-append prefix (number->string counter)))))
110
111
112 \f
113 ;;; Messages
114 ;;; ========
115
116 (define-record-type <message>
117   (make-message-intern id to from action
118                        body in-reply-to wants-reply)
119   message?
120   ;; @@: message-ids are removed.  They could be re-enabled
121   ;;   if we had thread-safe promises...
122   (id message-id)                    ; id of this message
123   (to message-to)                    ; actor id this is going to
124   (from message-from)                ; actor id of sender
125   (action message-action)            ; action (a symbol) to be handled
126   (body message-body)                ; argument list "body" of message
127   (in-reply-to message-in-reply-to)  ; message id this is in reply to, if any
128   (wants-reply message-wants-reply)) ; whether caller is waiting for reply
129
130
131 (define* (make-message id to from action body
132                        #:key in-reply-to wants-reply)
133   (make-message-intern id to from action body
134                        in-reply-to wants-reply))
135
136 (define (kwarg-list-to-alist args)
137   (let loop ((remaining args)
138              (result '()))
139     (match remaining
140       (((? keyword? key) val rest ...)
141        (loop rest
142              (cons (cons (keyword->symbol key) val) 
143                    result)))
144       (() result)
145       (_ (throw 'invalid-kwarg-list
146                 "Invalid keyword argument list"
147                 args)))))
148
149
150 ;;; See: https://web.archive.org/web/20081223021934/http://mumble.net/~jar/articles/oo-moon-weinreb.html
151 ;;;   (also worth seeing: http://mumble.net/~jar/articles/oo.html )
152
153 ;; This is the internal, generalized message sending method.
154 ;; Users shouldn't use it!  Use the <-foo forms instead.
155
156 (define-inlinable (%<- wants-reply from-actor to action args message-id in-reply-to)
157   ;; Okay, we need to deal with message ids.
158   ;; Could we get rid of them? :\
159   ;; It seems if we can use eq? and have messages be immutable then
160   ;; it should be possible to identify follow-up replies.
161   ;; If we need to track replies across hive boundaries we could
162   ;; register unique ids across the ambassador barrier.
163   (match to
164     (#(_ _ (? channel? channel) dead?)
165      (let ((message (make-message message-id to
166                                   (and from-actor (actor-id from-actor))
167                                   action args
168                                   #:wants-reply wants-reply
169                                   #:in-reply-to in-reply-to)))
170        (perform-operation
171         (choice-operation
172          (put-operation channel message)
173          (wait-operation dead?)))))
174     ;; TODO: put remote addresses here.
175     (#(actor-id hive-id #f #f)
176      ;; Here we'd make a call to our hive...
177      'TODO)
178     ;; A message sent to nobody goes nowhere.
179     ;; TODO: Should we display a warning here, probably?
180     (#f #f)))
181
182 (define (<- to action . args)
183   (define from-actor (*current-actor*))
184   (%<- #f from-actor to action args
185        (or (and from-actor
186                 ((actor-msg-id-generator from-actor)))
187            (big-random-number-string))
188        #f))
189
190 ;; TODO: this should abort to the prompt, then check for errors
191 ;;   when resuming.
192
193 (define (<-wait to action . args)
194   (define prompt (*actor-prompt*))
195   (when (not prompt)
196     (error "Tried to <-wait without being in an actor's context..."))
197
198   (let ((reply (abort-to-prompt prompt '<-wait to action args)))
199     (cond ((eq? action '*error*)
200            (throw 'hive-unresumable-coroutine
201                   "Won't resume coroutine; got an *error* as a reply"
202                   #:message reply))
203           (else (apply values (message-body reply))))))
204
205 \f
206 ;;; Main actor implementation
207 ;;; =========================
208
209 (define (actor-inheritable-message-handler actor message)
210   (define action (message-action message))
211   (define method
212     (class-rmeta-ref (class-of actor) 'actions action
213                      #:equals? eq? #:cache-set! hashq-set!
214                      #:cache-ref hashq-ref))
215   (unless method
216     (throw 'action-not-found
217            "No appropriate action handler found for actor"
218            #:action action
219            #:actor actor
220            #:message message))
221   (apply method actor message (message-body message)))
222
223 (define-syntax-rule (wrap-apply body)
224   "Wrap possibly multi-value function in a procedure, applies all arguments"
225   (lambda args
226     (apply body args)))
227
228 (define-syntax-rule (build-actions (symbol method) ...)
229   "Construct an alist of (symbol . method), where the method is wrapped
230 with wrap-apply to facilitate live hacking and allow the method definition
231 to come after class definition."
232   (build-rmeta-slot
233    (list (cons (quote symbol)
234                (wrap-apply method)) ...)))
235
236 (define-class <actor> ()
237   ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?)
238   ;;  - inbox-channel is the receiving channel (as opposed to actor-inbox-deq)
239   ;;  - dead? is a fibers condition variable which is set once this actor
240   ;;    kicks the bucket
241   (id #:init-keyword #:address
242       #:getter actor-id)
243
244   ;; Our queue to send/receive messages on
245   (inbox-deq #:init-thunk make-channel
246              #:accessor actor-inbox-deq)
247
248   (msg-id-generator #:init-thunk simple-message-id-generator
249                     #:getter actor-msg-id-generator)
250
251   ;; How we receive and process new messages
252   (message-handler #:init-value actor-inheritable-message-handler
253                    ;; @@: There's no reason not to use #:class instead of
254                    ;;   #:each-subclass anywhere in this file, except for
255                    ;;   Guile bug #25211 (#:class is broken in Guile 2.2)
256                    #:allocation #:each-subclass
257                    #:getter actor-message-handler)
258
259   ;; valid values are:
260   ;;  - #t as in, send the init message, but don't wait (default)
261   ;;  - 'wait, as in wait on the init message
262   ;;  - #f as in don't bother to init
263   (should-init #:init-value #t
264                #:getter actor-should-init
265                #:allocation #:each-subclass)
266
267   ;; This is the default, "simple" way to inherit and process messages.
268   (actions #:init-thunk (build-actions)
269            #:allocation #:each-subclass))
270
271 ;;; Actors may specify an "init" action that occurs before the actor
272 ;;; actually begins to run.
273 ;;; During actor-init!, an actor may send a message to itself or others
274 ;;; via <- but *may not* use <-wait.
275 (define-method (actor-init! (actor <actor>))
276   'no-op)
277
278 (define-method (actor-cleanup! (actor <actor>))
279   'no-op)
280
281 ;;; Addresses are vectors where the first part is the actor-id and
282 ;;; the second part is the hive-id.  This works well enough... they
283 ;;; look decent being pretty-printed.
284
285 (define (make-address actor-id hive-id channel dead?)
286   (vector actor-id hive-id channel dead?))
287
288 (define (address-actor-id address)
289   (vector-ref address 0))
290
291 (define (address-hive-id address)
292   (vector-ref address 1))
293
294 (define (address-channel address)
295   (vector-ref address 2))
296
297 (define (address-dead? address)
298   (vector-ref address 3))
299
300 (define (address->string address)
301   (string-append (address-actor-id address) "@"
302                  (address-hive-id address)))
303
304 (define (address-equal? address1 address2)
305   "Check whether or not the two addresses are equal.
306
307 This compares the actor-id and hive-id but ignores the channel and
308 dead? condition."
309   (match address1
310     (#(actor-id-1 hive-id-1 _ _)
311      (match address2
312        (#(actor-id-2 hive-id-2)
313         (and (equal? actor-id-1 actor-id-2)
314              (and (equal? hive-id-1 hive-id-2))))
315        (_ #f)))
316     (_ #f)))
317
318 (define (actor-id-actor actor)
319   "Get the actor id component of the actor-id"
320   (address-actor-id (actor-id actor)))
321
322 (define (actor-id-hive actor)
323   "Get the hive id component of the actor-id"
324   (address-hive-id (actor-id actor)))
325
326 (define (actor-id-string actor)
327   "Render the full actor id as a human-readable string"
328   (address->string (actor-id actor)))
329
330 (define (actor-inbox-enq actor)
331   (address-channel (actor-id actor)))
332
333 (define *current-actor*
334   (make-parameter #f))
335
336 (define *actor-prompt*
337   (make-parameter #f))
338
339 (define *resume-io-channel*
340   (make-parameter #f))
341
342 (define (actor-main-loop actor)
343   "Main loop of the actor.  Loops around, pulling messages off its queue
344 and handling them."
345   ;; @@: Maybe establish some sort of garbage collection routine for these...
346   (define waiting
347     (make-hash-table))
348   (define message-handler
349     (actor-message-handler actor))
350   (define dead?
351     (address-dead? (actor-id actor)))
352   (define prompt (make-prompt-tag (actor-id-actor actor)))
353   ;; Not always used, only if with-actor-nonblocking-ports is used
354   (define resume-io-channel
355     (make-channel))
356
357   (define (handle-message message)
358     (catch #t
359       (lambda ()
360         (call-with-values
361             (lambda ()
362               (message-handler actor message))
363           (lambda vals
364             ;; Return reply if necessary
365             (when (message-wants-reply message)
366               (when (message-wants-reply message)
367                 (%<- #f actor (message-from message) '*reply*
368                      vals ((actor-msg-id-generator actor))
369                      (message-id message)))))))
370       (const #t)
371       (let ((err (current-error-port)))
372         (lambda (key . args)
373           (false-if-exception
374            (let ((stack (make-stack #t 4)))
375              (format err "Uncaught exception when handling message ~a:\n"
376                      message)
377              (display-backtrace stack err)
378              (print-exception err (stack-ref stack 0)
379                               key args)
380              (newline err)
381              ;; If the other actor is waiting on a reply, let's let them
382              ;; know there was an error...
383              (when (message-wants-reply message)
384                (%<- #f actor (message-from message) '*error*
385                     (list key) ((actor-msg-id-generator actor))
386                     (message-id message)))))))))
387   
388   (define (resume-handler message)
389     (define in-reply-to (message-in-reply-to message))
390     (cond
391      ((hash-ref waiting in-reply-to) =>
392       (lambda (kont)
393         (hash-remove! waiting in-reply-to)
394         (kont message)))
395      (else
396       (format (current-error-port)
397               "Tried to resume nonexistant message: ~a\n"
398               (message-id message)))))
399
400   (define halt-or-handle-message
401     ;; It would be nice if we could give priorities to certain operations.
402     ;; halt should always win over getting a message...
403     (choice-operation
404      (wrap-operation (wait-operation dead?)
405                      (const #f))  ; halt and return
406      (wrap-operation (get-operation (actor-inbox-deq actor))
407                      (lambda (message)
408                        (call-with-prompt prompt
409                          (lambda ()
410                            (if (message-in-reply-to message)
411                                ;; resume a continuation which was waiting on a reply
412                                (resume-handler message)
413                                ;; start handling a new message
414                                (handle-message message)))
415                          ;; Here's where we abort to if we're doing <-wait
416                          ;; @@: maybe use match-lambda if we're going to end up
417                          ;;   handling multiple ~commands
418                          (match-lambda*
419                            ((kont '<-wait to action message-args)
420                             (define message-id
421                               ((actor-msg-id-generator actor)))
422                             (hash-set! waiting message-id kont)
423                             (%<- #t actor to action message-args message-id #f))
424                            ((kont 'run-me proc)
425                             (proc kont))))
426                        #t))   ; loop again
427      (wrap-operation (get-operation resume-io-channel)
428                      (lambda (thunk)
429                        (thunk
430                         #t)))))
431
432   ;; Mutate the parameter; this should be fine since each fiber
433   ;; runs in its own dynamic state with with-dynamic-state.
434   ;; See with-dynamic-state discussion in
435   ;;   https://wingolog.org/archives/2017/06/27/growing-fibers
436   (*current-actor* actor)
437   (*resume-io-channel* resume-io-channel)
438
439   ;; We temporarily set the *actor-prompt* to #f to make sure that
440   ;; actor-init! doesn't try to do a <-wait message (and not accidentally use
441   ;; the parent fiber's *actor-prompt* either.)
442   (*actor-prompt* #f)
443   (actor-init! actor)
444   (*actor-prompt* prompt)
445
446   (let loop ()
447     (and (perform-operation halt-or-handle-message)
448          (loop))))
449
450
451 ;; @@: So in order for this to work, we're going to have to add
452 ;; another channel to actors, which is resumable i/o.  We'll have to
453 ;; spawn a fiber that wakes up a thunk on the actor when its port is
454 ;; available.  Funky...
455
456 (define (%suspend-io-to-actor resume-method get-wait-fd-method)
457   (lambda (port)
458     (define prompt (*actor-prompt*))
459     (define resume-channel (*resume-io-channel*))
460     (define (run-at-prompt k)
461       (spawn-fiber
462        (lambda ()
463          (suspend-current-fiber
464           (lambda (fiber)
465             (resume-on-readable-fd (port-read-wait-fd port) fiber)))
466          ;; okay, we're awake again, tell the actor to resume this
467          ;; continuation
468          (put-message resume-channel k))
469        #:parallel? #f))
470     (when (not prompt)
471       (error "Attempt to abort to actor prompt outside of actor"))
472     (abort-to-prompt (*actor-prompt*)
473                      'run-me run-at-prompt)))
474
475 (define suspend-read-to-actor
476   (%suspend-io-to-actor resume-on-readable-fd port-read-wait-fd))
477
478 (define suspend-write-to-actor
479   (%suspend-io-to-actor resume-on-writable-fd port-write-wait-fd))
480
481 (define (with-actor-nonblocking-ports thunk)
482   "Runs THUNK in dynamic context in which attempting to read/write
483 from a port that would otherwise block an actor's correspondence with
484 other actors (note that reading from a nonblocking port should never
485 block other fibers) will instead permit reading other messages while
486 I/O is waiting to complete.
487
488 Note that currently "
489   (parameterize ((current-read-waiter suspend-read-to-actor)
490                  (current-write-waiter suspend-write-to-actor))
491     (thunk)))
492
493 (define (actor-spawn-fiber thunk . args)
494   "Spawn a fiber from an actor but unset actor-machinery-specific
495 dynamic context."
496   (apply spawn-fiber
497          (lambda ()
498            (*current-actor* #f)
499            (*resume-io-channel* #f)
500            (*actor-prompt* #f)
501            (thunk))
502          args))
503
504
505 \f
506 ;;; Actor utilities
507 ;;; ===============
508
509 (define-syntax-rule (define-actor class inherits
510                       (action ...)
511                       slots ...)
512   (define-class class inherits
513     (actions #:init-thunk (build-actions action ...)
514              #:allocation #:each-subclass)
515     slots ...))
516
517 \f
518 ;;; The Hive
519 ;;; ========
520 ;;;   Every actor has a hive, which keeps track of other actors, manages
521 ;;;   cleanup, and performs inter-hive communication.
522
523 ;; TODO: Make this a srfi-9 record type
524 (define-class <hive> ()
525   (id #:init-keyword #:id
526       #:getter hive-id)
527   (actor-registry #:init-thunk make-hash-table
528                   #:getter hive-actor-registry)
529   ;; TODO: Rename "ambassadors" to "relays"
530   ;; Ambassadors are used (or will be) for inter-hive communication.
531   ;; These are special actors that know how to route messages to other
532   ;; hives.
533   (ambassadors #:init-thunk make-weak-key-hash-table
534                #:getter hive-ambassadors)
535   (channel #:init-thunk make-channel
536            #:getter hive-channel)
537   (halt? #:init-thunk make-condition
538          #:getter hive-halt?))
539
540 (define* (make-hive #:key hive-id)
541   (make <hive> #:id (or hive-id
542                         (big-random-number-string))))
543
544 (define (gen-actor-id cookie)
545   (if cookie
546       (string-append cookie ":" (big-random-number-string))
547       (big-random-number-string)))
548
549 (define (hive-main-loop hive)
550   "The main loop of the hive.  This listens for messages on the hive-channel
551 for certain actions to perform.
552
553 `messages' here is not the same as a <message> object; these are a list of
554 values, the first value being a symbol"
555   (define channel (hive-channel hive))
556   (define halt? (hive-halt? hive))
557   (define registry (hive-actor-registry hive))
558
559   ;; not the same as a <message> ;P
560   (define handle-message
561     (match-lambda
562       (('register-actor actor-id address actor)
563        (hash-set! registry actor-id (vector address actor)))
564       ;; Remove the actor from hive
565       (('remove-actor actor-id)
566        (hash-remove! (hive-actor-registry hive) actor-id))
567       (('register-ambassador hive-id ambassador-actor-id)
568        'TODO)
569       (('unregister-ambassador hive-id ambassador-actor-id)
570        'TODO)
571       (('forward-message from-actor-id message)
572        'TODO)))
573
574   (define halt-or-handle
575     (choice-operation
576      (wrap-operation (get-operation channel)
577                      (lambda (msg)
578                        (handle-message msg)
579                        #t))
580      (wrap-operation (wait-operation halt?)
581                      (const #f))))
582
583   (let lp ()
584     (and (perform-operation halt-or-handle)
585          (lp))))
586
587 (define *hive-id* (make-parameter #f))
588 (define *hive-channel* (make-parameter #f))
589
590 ;; @@: Should we halt the hive either at the end of spawn-hive or run-hive?
591 (define* (spawn-hive proc #:key (hive (make-hive)))
592   "Spawn a hive and run PROC, passing it the fresh hive and establishing
593 a dynamic context surrounding the hive."
594   (spawn-fiber (lambda () (hive-main-loop hive)))
595   (parameterize ((*hive-id* (hive-id hive))
596                  (*hive-channel* (hive-channel hive)))
597     (proc hive)))
598
599 (define (run-hive proc . args)
600   "Spawn a hive and run it in run-fibers.  Takes a PROC as would be passed
601 to spawn-hive... all remaining arguments passed to run-fibers."
602   (apply run-fibers
603          (lambda ()
604            (spawn-hive proc))
605          args))
606
607 (define (%create-actor actor-class init-args id-cookie send-init?)
608   (let* ((hive-channel (*hive-channel*))
609          (hive-id (*hive-id*))
610          (actor-id (gen-actor-id id-cookie))
611          (dead? (make-condition))
612          (inbox-enq (make-channel))
613          (address (make-address actor-id hive-id
614                                 inbox-enq dead?))
615          (actor (apply make actor-class
616                        #:address address
617                        init-args))
618          (should-init (actor-should-init actor)))
619
620     ;; start the main loop
621     (spawn-fiber (lambda ()
622                    ;; start the inbox loop
623                    (spawn-fiber
624                     (lambda ()
625                       (delivery-agent inbox-enq (actor-inbox-deq actor)
626                                       dead?))
627                     ;; this one is decidedly non-parallel, because we want
628                     ;; the delivery agent to be in the same thread as its actor
629                     #:parallel? #f)
630
631                    (actor-main-loop actor))
632                  #:parallel? #t)
633
634     (put-message hive-channel (list 'register-actor actor-id address actor))
635     
636     ;; return the address
637     address))
638
639 (define* (create-actor actor-class #:rest init-args)
640   "Create an instance of actor-class.  Return the new actor's id.
641
642 This is the method actors should call directly (unless they want
643 to supply an id-cookie, in which case they should use
644 create-actor*)."
645   (%create-actor actor-class init-args #f #t))
646
647
648 (define* (create-actor* actor-class id-cookie #:rest init-args)
649   "Create an instance of actor-class.  Return the new actor's id.
650
651 Like create-actor, but permits supplying an id-cookie."
652   (%create-actor actor-class init-args id-cookie #t))
653
654 (define* (self-destruct actor #:key (cleanup #t))
655   "Remove an actor from the hive.
656
657 Unless #:cleanup is set to #f, this will first have the actor handle
658 its '*cleanup* action handler."
659   (signal-condition! (address-dead? (actor-id actor)))
660   (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor)))
661   ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending
662   ;; a message with <-wait
663   (*actor-prompt* #f)
664   (actor-cleanup! actor))
665
666 ;; From a patch I sent to Fibers...
667 (define (condition-signalled? cvar)
668   "Return @code{#t} if @var{cvar} has already been signalled.
669
670 In general you will want to use @code{wait} or @code{wait-operation} to
671 wait on a condition.  However, sometimes it is useful to see whether or
672 not a condition has already been signalled without blocking."
673   (atomic-box-ref ((@@ (fibers conditions) condition-signalled?) cvar)))
674
675 (define (actor-alive? actor)
676   (condition-signalled? (address-dead? (actor-id actor))))