1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright © 2016, 2017 Christopher Allan Webber <cwebber@dustycloud.org>
4 ;;; This file is part of 8sync.
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ;;; GNU Lesser General Public License for more details.
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync. If not, see <http://www.gnu.org/licenses/>.
19 (define-module (8sync actors)
20 #:use-module (oop goops)
21 #:use-module (srfi srfi-9)
22 #:use-module (srfi srfi-9 gnu)
23 #:use-module (ice-9 control)
24 #:use-module (ice-9 format)
25 #:use-module (ice-9 match)
26 #:use-module (ice-9 atomic)
27 #:use-module ((ice-9 ports internal)
28 #:select (port-read-wait-fd port-write-wait-fd))
29 #:use-module (ice-9 pretty-print)
30 #:use-module (ice-9 receive)
31 #:use-module (ice-9 suspendable-ports)
33 #:use-module (fibers channels)
34 #:use-module (fibers conditions)
35 #:use-module (fibers operations)
36 #:use-module (8sync inbox)
37 #:use-module (8sync rmeta-slot)
39 #:export (;; utilities... ought to go in their own module
41 big-random-number-string
49 ;;; Commenting out the <address> type for now;
50 ;;; it may be back when we have better serializers
53 address-actor-id address-hive-id
60 actor-init! actor-cleanup!
67 with-actor-nonblocking-ports
71 ;; ;; There are more methods for the hive, but there's
72 ;; ;; no reason for the outside world to look at them maybe?
74 create-actor create-actor*
79 message-to message-action message-from
80 message-id message-body message-in-reply-to
87 ;; Maybe the wrong place for this, or for it to be exported.
88 ;; But it's used in websockets' server implementation at least...
94 (set! *random-state* (random-state-from-platform))
96 ;; Same size as a uuid4 I think...
97 (define random-number-size (expt 2 128))
99 (define (big-random-number)
100 (random random-number-size))
102 ;; Would be great to get this base64 encoded instead.
103 (define (big-random-number-string)
104 ;; @@: This is slow. Using format here is wasteful.
105 (format #f "~x" (big-random-number)))
107 ;; @@: This is slow-ish. A mere ~275k / second on my (old) machine.
108 ;; The main cost seems to be in number->string.
109 (define (simple-message-id-generator)
110 ;; Prepending this cookie makes message ids unique per hive
111 (let ((prefix (format #f "~x:" (big-random-number)))
114 (set! counter (1+ counter))
115 (string-append prefix (number->string counter)))))
122 (define-record-type <message>
123 (make-message-intern id to from action
124 body in-reply-to wants-reply)
126 ;; @@: message-ids are removed. They could be re-enabled
127 ;; if we had thread-safe promises...
128 (id message-id) ; id of this message
129 (to message-to) ; actor id this is going to
130 (from message-from) ; actor id of sender
131 (action message-action) ; action (a symbol) to be handled
132 (body message-body) ; argument list "body" of message
133 (in-reply-to message-in-reply-to) ; message id this is in reply to, if any
134 (wants-reply message-wants-reply)) ; whether caller is waiting for reply
137 (define* (make-message id to from action body
138 #:key in-reply-to wants-reply)
139 (make-message-intern id to from action body
140 in-reply-to wants-reply))
142 (define (kwarg-list-to-alist args)
143 (let loop ((remaining args)
146 (((? keyword? key) val rest ...)
148 (cons (cons (keyword->symbol key) val)
151 (_ (throw 'invalid-kwarg-list
152 "Invalid keyword argument list"
156 ;;; See: https://web.archive.org/web/20081223021934/http://mumble.net/~jar/articles/oo-moon-weinreb.html
157 ;;; (also worth seeing: http://mumble.net/~jar/articles/oo.html )
159 ;; This is the internal, generalized message sending method.
160 ;; Users shouldn't use it! Use the <-foo forms instead.
162 (define (%<- wants-reply from-actor to action args message-id in-reply-to)
163 ;; Okay, we need to deal with message ids.
164 ;; Could we get rid of them? :\
165 ;; It seems if we can use eq? and have messages be immutable then
166 ;; it should be possible to identify follow-up replies.
167 ;; If we need to track replies across hive boundaries we could
168 ;; register unique ids across the ambassador barrier.
170 (($ <address> _ _ (? channel? channel) dead?)
171 (let ((message (make-message message-id to
172 (and from-actor (actor-id from-actor))
174 #:wants-reply wants-reply
175 #:in-reply-to in-reply-to)))
178 (put-operation channel message)
179 (wait-operation dead?)))))
180 ;; TODO: put remote addresses here.
181 (($ <address> actor-id hive-id #f #f)
182 ;; Here we'd make a call to our hive...
184 ;; A message sent to nobody goes nowhere.
185 ;; TODO: Should we display a warning here, probably?
187 ;; We shouldn't technically be passing in actors but rather their
188 ;; addresses, but often actors want to message themselves and
189 ;; this makes that slightly easier.
190 ((? (lambda (x) (is-a? x <actor>)) actor)
191 (%<- wants-reply from-actor (actor-id actor) action
192 args message-id in-reply-to))))
194 (define (<- to action . args)
195 (define from-actor (*current-actor*))
196 (%<- #f from-actor to action args
198 ((actor-msg-id-generator from-actor)))
199 (big-random-number-string))
202 (define (<-wait to action . args)
203 (define prompt (*actor-prompt*))
205 (error "Tried to <-wait without being in an actor's context..."))
207 (let ((reply (abort-to-prompt prompt '<-wait to action args)))
208 (cond ((eq? (message-action reply) '*error*)
209 (throw 'hive-unresumable-coroutine
210 "Won't resume coroutine; got an *error* as a reply"
212 (else (apply values (message-body reply))))))
215 ;;; Main actor implementation
216 ;;; =========================
218 (define (actor-inheritable-message-handler actor message)
219 (define action (message-action message))
221 (class-rmeta-ref (class-of actor) 'actions action
222 #:equals? eq? #:cache-set! hashq-set!
223 #:cache-ref hashq-ref))
225 (throw 'action-not-found
226 "No appropriate action handler found for actor"
230 (apply method actor message (message-body message)))
232 (define-syntax-rule (live-wrap body)
233 "Wrap possibly multi-value function in a procedure, applies all arguments"
237 (define-syntax-rule (build-actions (symbol method) ...)
238 "Construct an alist of (symbol . method), where the method is wrapped
239 with `live-wrap' to facilitate live hacking and allow the method definition
240 to come after class definition."
242 (list (cons (quote symbol)
243 (live-wrap method)) ...)))
245 (define-class <actor> ()
246 ;; An <address> object
247 (id #:init-keyword #:address
250 ;; Our queue to send/receive messages on
251 (inbox-deq #:init-thunk make-channel
252 #:accessor actor-inbox-deq)
254 (msg-id-generator #:init-thunk simple-message-id-generator
255 #:getter actor-msg-id-generator)
257 ;; How we receive and process new messages
258 (message-handler #:init-value actor-inheritable-message-handler
259 ;; @@: There's no reason not to use #:class instead of
260 ;; #:each-subclass anywhere in this file, except for
261 ;; Guile bug #25211 (#:class is broken in Guile 2.2)
262 #:allocation #:each-subclass
263 #:getter actor-message-handler)
266 ;; - #t as in, send the init message, but don't wait (default)
267 ;; - 'wait, as in wait on the init message
268 ;; - #f as in don't bother to init
269 (should-init #:init-value #t
270 #:getter actor-should-init
271 #:allocation #:each-subclass)
273 ;; This is the default, "simple" way to inherit and process messages.
274 (actions #:init-thunk (build-actions)
275 #:allocation #:each-subclass))
277 ;;; Actors may specify an "init" action that occurs before the actor
278 ;;; actually begins to run.
279 ;;; During actor-init!, an actor may send a message to itself or others
280 ;;; via <- but *may not* use <-wait.
281 (define-method (actor-init! (actor <actor>))
284 (define-method (actor-cleanup! (actor <actor>))
287 ;;; Every actor has an address, which is how its identified.
288 ;;; We also pack in some routing information.
289 (define-record-type <address>
290 (make-address actor-id hive-id channel dead?)
292 ;; Unique-to-this-actor-on-this-hive part
293 (actor-id address-actor-id)
294 ;; Unique identifier for the hive we're connected to.
295 ;; If we don't have a "direct" link to the other actor through
296 ;; a channel, we'll have to look up if our hive has a way to route
297 ;; to the other hive.
298 (hive-id address-hive-id)
299 ;; The receiving channel (as opposed to actor-inbox-deq)
300 (channel address-channel)
301 ;; A fibers condition variable which is set once this actor kicks
303 (dead? address-dead?))
305 (set-record-type-printer!
307 (lambda (address port)
308 (format port "<address ~a ~a>"
309 (address->string address)
310 (if (address-channel address)
314 ((@@ (fibers conditions) condition-signalled?)
315 (address-dead? address)))
319 (define (address->string address)
320 (string-append (address-actor-id address) "@"
321 (address-hive-id address)))
323 (define (address-equal? address1 address2)
324 "Check whether or not the two addresses are equal.
326 This compares the actor-id and hive-id but ignores the channel and
328 (and (equal? (address-actor-id address1)
329 (address-actor-id address2))
330 (equal? (address-hive-id address1)
331 (address-hive-id address2))))
333 (define (actor-id-actor actor)
334 "Get the actor id component of the actor-id"
335 (address-actor-id (actor-id actor)))
337 (define (actor-id-hive actor)
338 "Get the hive id component of the actor-id"
339 (address-hive-id (actor-id actor)))
341 (define (actor-id-string actor)
342 "Render the full actor id as a human-readable string"
343 (address->string (actor-id actor)))
345 (define (actor-inbox-enq actor)
346 (address-channel (actor-id actor)))
348 (define *current-actor*
351 (define *actor-prompt*
354 (define *resume-io-channel*
357 (define (actor-main-loop actor)
358 "Main loop of the actor. Loops around, pulling messages off its queue
360 ;; @@: Maybe establish some sort of garbage collection routine for these...
363 (define message-handler
364 (actor-message-handler actor))
366 (address-dead? (actor-id actor)))
367 (define prompt (make-prompt-tag (actor-id-actor actor)))
368 ;; Not always used, only if with-actor-nonblocking-ports is used
369 (define resume-io-channel
372 (define (handle-message message)
377 (message-handler actor message))
379 ;; Return reply if necessary
380 (when (message-wants-reply message)
381 (%<- #f actor (message-from message) '*reply*
382 vals ((actor-msg-id-generator actor))
383 (message-id message))))))
385 (let ((err (current-error-port)))
388 (let ((stack (make-stack #t 4)))
389 (format err "Uncaught exception when handling message ~a:\n"
391 (display-backtrace stack err)
392 (print-exception err (stack-ref stack 0)
395 ;; If the other actor is waiting on a reply, let's let them
396 ;; know there was an error...
397 (when (message-wants-reply message)
398 (%<- #f actor (message-from message) '*error*
399 (list key) ((actor-msg-id-generator actor))
400 (message-id message)))))))))
402 (define (resume-handler message)
403 (define in-reply-to (message-in-reply-to message))
405 ((hash-ref waiting in-reply-to) =>
407 (hash-remove! waiting in-reply-to)
410 (format (current-error-port)
411 "Tried to resume nonexistant message: ~a\n"
412 (message-id message)))))
414 (define (call-with-actor-prompt thunk)
415 (call-with-prompt prompt
417 ;; Here's where we abort to if we're doing <-wait
418 ;; @@: maybe use match-lambda if we're going to end up
419 ;; handling multiple ~commands
421 ((kont '<-wait to action message-args)
423 ((actor-msg-id-generator actor)))
424 (hash-set! waiting message-id kont)
425 (%<- #t actor to action message-args message-id #f))
429 (define halt-or-handle-message
430 ;; It would be nice if we could give priorities to certain operations.
431 ;; halt should always win over getting a message...
433 (wrap-operation (wait-operation dead?)
434 (const #f)) ; halt and return
435 (wrap-operation (get-operation (actor-inbox-deq actor))
437 (call-with-actor-prompt
439 (if (message-in-reply-to message)
440 ;; resume a continuation which was waiting on a reply
441 (resume-handler message)
442 ;; start handling a new message
443 (handle-message message))))
445 (wrap-operation (get-operation resume-io-channel)
447 (call-with-actor-prompt
452 ;; Mutate the parameter; this should be fine since each fiber
453 ;; runs in its own dynamic state with with-dynamic-state.
454 ;; See with-dynamic-state discussion in
455 ;; https://wingolog.org/archives/2017/06/27/growing-fibers
456 (*current-actor* actor)
457 (*resume-io-channel* resume-io-channel)
459 ;; We temporarily set the *actor-prompt* to #f to make sure that
460 ;; actor-init! doesn't try to do a <-wait message (and not accidentally use
461 ;; the parent fiber's *actor-prompt* either.)
464 (*actor-prompt* prompt)
467 (and (perform-operation halt-or-handle-message)
471 ;; @@: So in order for this to work, we're going to have to add
472 ;; another channel to actors, which is resumable i/o. We'll have to
473 ;; spawn a fiber that wakes up a thunk on the actor when its port is
474 ;; available. Funky...
476 (define (%suspend-io-to-actor wait-for-read/write)
478 (define prompt (*actor-prompt*))
479 (define resume-channel (*resume-io-channel*))
480 (define (run-at-prompt k)
483 (wait-for-read/write port)
484 ;; okay, we're awake again, tell the actor to resume this
486 (put-message resume-channel k))
489 (error "Attempt to abort to actor prompt outside of actor"))
490 (abort-to-prompt (*actor-prompt*)
491 'run-me run-at-prompt)))
493 (define suspend-read-to-actor
494 (%suspend-io-to-actor (@@ (fibers) wait-for-readable)))
496 (define suspend-write-to-actor
497 (%suspend-io-to-actor (@@ (fibers) wait-for-writable)))
499 (define (with-actor-nonblocking-ports thunk)
500 "Runs THUNK in dynamic context in which attempting to read/write
501 from a port that would otherwise block an actor's correspondence with
502 other actors (note that reading from a nonblocking port should never
503 block other fibers) will instead permit reading other messages while
504 I/O is waiting to complete.
506 Note that currently "
507 (parameterize ((current-read-waiter suspend-read-to-actor)
508 (current-write-waiter suspend-write-to-actor))
511 (define (actor-spawn-fiber thunk . args)
512 "Spawn a fiber from an actor but unset actor-machinery-specific
517 (*resume-io-channel* #f)
527 (define-syntax-rule (define-actor class inherits
530 (define-class class inherits
531 (actions #:init-thunk (build-actions action ...)
532 #:allocation #:each-subclass)
538 ;;; Every actor has a hive, which keeps track of other actors, manages
539 ;;; cleanup, and performs inter-hive communication.
541 ;; TODO: Make this a srfi-9 record type
542 (define-class <hive> ()
543 (id #:init-keyword #:id
545 (actor-registry #:init-thunk make-hash-table
546 #:getter hive-actor-registry)
547 ;; TODO: Rename "ambassadors" to "relays"
548 ;; Ambassadors are used (or will be) for inter-hive communication.
549 ;; These are special actors that know how to route messages to other
551 (ambassadors #:init-thunk make-weak-key-hash-table
552 #:getter hive-ambassadors)
553 (channel #:init-thunk make-channel
554 #:getter hive-channel)
555 (halt? #:init-thunk make-condition
556 #:getter hive-halt?))
558 (define* (make-hive #:key hive-id)
559 (make <hive> #:id (or hive-id
560 (big-random-number-string))))
562 (define (gen-actor-id cookie)
564 (string-append cookie ":" (big-random-number-string))
565 (big-random-number-string)))
567 (define (hive-main-loop hive)
568 "The main loop of the hive. This listens for messages on the hive-channel
569 for certain actions to perform.
571 `messages' here is not the same as a <message> object; these are a list of
572 values, the first value being a symbol"
573 (define channel (hive-channel hive))
574 (define halt? (hive-halt? hive))
575 (define registry (hive-actor-registry hive))
577 ;; not the same as a <message> ;P
578 (define handle-message
580 (('register-actor actor-id address actor)
581 (hash-set! registry actor-id (vector address actor)))
582 ;; Remove the actor from hive
583 (('remove-actor actor-id)
584 (hash-remove! (hive-actor-registry hive) actor-id))
585 (('register-ambassador hive-id ambassador-actor-id)
587 (('unregister-ambassador hive-id ambassador-actor-id)
589 (('forward-message from-actor-id message)
592 (define halt-or-handle
594 (wrap-operation (get-operation channel)
598 (wrap-operation (wait-operation halt?)
602 (and (perform-operation halt-or-handle)
605 (define *hive-id* (make-parameter #f))
606 (define *hive-channel* (make-parameter #f))
608 ;; @@: Should we halt the hive either at the end of spawn-hive or run-hive?
609 (define* (spawn-hive proc #:key (hive (make-hive)))
610 "Spawn a hive and run PROC, passing it the fresh hive and establishing
611 a dynamic context surrounding the hive."
612 (spawn-fiber (lambda () (hive-main-loop hive)))
613 (parameterize ((*hive-id* (hive-id hive))
614 (*hive-channel* (hive-channel hive)))
617 (define (run-hive proc . args)
618 "Spawn a hive and run it in run-fibers. Takes a PROC as would be passed
619 to spawn-hive... all remaining arguments passed to run-fibers."
625 (define (%create-actor actor-class init-args id-cookie send-init?)
626 (let* ((hive-channel (*hive-channel*))
627 (hive-id (*hive-id*))
628 (actor-id (gen-actor-id id-cookie))
629 (dead? (make-condition))
630 (inbox-enq (make-channel))
631 (address (make-address actor-id hive-id
633 (actor (apply make actor-class
636 (should-init (actor-should-init actor)))
638 ;; start the main loop
639 (spawn-fiber (lambda ()
640 ;; start the inbox loop
643 (delivery-agent inbox-enq (actor-inbox-deq actor)
645 ;; this one is decidedly non-parallel, because we want
646 ;; the delivery agent to be in the same thread as its actor
649 (actor-main-loop actor))
652 (put-message hive-channel (list 'register-actor actor-id address actor))
654 ;; return the address
657 ;;; Whether or not to attach the class' name as a cookie by default in
659 (define *debug-actor-ids*
662 (define* (create-actor actor-class #:rest init-args)
663 "Create an instance of actor-class. Return the new actor's id.
665 This is the method actors should call directly (unless they want
666 to supply an id-cookie, in which case they should use
668 (%create-actor actor-class init-args
669 (if (*debug-actor-ids*)
670 (symbol->string (class-name actor-class))
674 (define* (create-actor* actor-class id-cookie #:rest init-args)
675 "Create an instance of actor-class. Return the new actor's id.
677 Like create-actor, but permits supplying an id-cookie."
678 (%create-actor actor-class init-args id-cookie #t))
680 (define* (self-destruct actor #:key (cleanup #t))
681 "Remove an actor from the hive.
683 Unless #:cleanup is set to #f, this will first have the actor handle
684 its '*cleanup* action handler."
685 (signal-condition! (address-dead? (actor-id actor)))
686 (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor)))
687 ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending
688 ;; a message with <-wait
690 (actor-cleanup! actor))