X-Git-Url: https://jxself.org/git/?p=8sync.git;a=blobdiff_plain;f=8sync%2Factors.scm;h=62a07ac113e15d23b87c3cce0e6cf58c2856b8ae;hp=fdcbc85193c43300b293de7e8abb92933073810d;hb=d50c1349a0f5abc0e110400f0c3315a1809bf813;hpb=4deb5433574cfdf8f2b2bf22460c8b74ba426bed diff --git a/8sync/actors.scm b/8sync/actors.scm index fdcbc85..62a07ac 100644 --- a/8sync/actors.scm +++ b/8sync/actors.scm @@ -1,5 +1,5 @@ ;;; 8sync --- Asynchronous programming for Guile -;;; Copyright (C) 2016 Christopher Allan Webber +;;; Copyright © 2016, 2017 Christopher Allan Webber ;;; ;;; This file is part of 8sync. ;;; @@ -19,28 +19,34 @@ (define-module (8sync actors) #:use-module (oop goops) #:use-module (srfi srfi-9) - #:use-module (srfi srfi-9 gnu) #:use-module (ice-9 control) #:use-module (ice-9 format) #:use-module (ice-9 match) + #:use-module (ice-9 atomic) + #:use-module ((ice-9 ports internal) + #:select (port-read-wait-fd port-write-wait-fd)) #:use-module (ice-9 pretty-print) - #:use-module (8sync agenda) - #:use-module (8sync repl) + #:use-module (ice-9 receive) + #:use-module (ice-9 suspendable-ports) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers conditions) + #:use-module (fibers operations) + #:use-module (8sync inbox) + #:use-module (8sync rmeta-slot) + #:export (;; utilities... ought to go in their own module big-random-number big-random-number-string - simple-message-id-generator actor-id actor-message-handler - %current-actor - ;;; Commenting out the
type for now; ;;; it may be back when we have better serializers ;;
- make-address address? + make-address address-actor-id address-hive-id address->string @@ -48,17 +54,22 @@ actor-id-hive actor-id-string + actor-init! actor-cleanup! + + actor-alive? + build-actions - define-simple-actor + define-actor - - make-hive - ;; There are more methods for the hive, but there's - ;; no reason for the outside world to look at them maybe? - hive-id - hive-create-actor hive-create-actor* + actor-spawn-fiber + with-actor-nonblocking-ports + ;; + ;; make-hive + ;; ;; There are more methods for the hive, but there's + ;; ;; no reason for the outside world to look at them maybe? + ;; hive-id create-actor create-actor* self-destruct @@ -68,35 +79,29 @@ message-id message-body message-in-reply-to message-wants-reply - message-auto-reply? - - <- <-wait <-reply <-reply-wait + <- <-wait - call-with-message msg-receive msg-val + spawn-hive run-hive - ez-run-hive - bootstrap-message - - serialize-message write-message - serialize-message-pretty pprint-message - read-message read-message-from-string)) + ;; Maybe the wrong place for this, or for it to be exported. + ;; But it's used in websockets' server implementation at least... + wrap)) ;; For ids -(define %random-state - (make-parameter (random-state-from-platform))) +(set! *random-state* (random-state-from-platform)) ;; Same size as a uuid4 I think... (define random-number-size (expt 2 128)) (define (big-random-number) - (random random-number-size (%random-state))) + (random random-number-size)) ;; Would be great to get this base64 encoded instead. (define (big-random-number-string) ;; @@: This is slow. Using format here is wasteful. (format #f "~x" (big-random-number))) -;; @@: This is slow. A mere ~275k / second on my (old) machine. +;; @@: This is slow-ish. A mere ~275k / second on my (old) machine. ;; The main cost seems to be in number->string. (define (simple-message-id-generator) ;; Prepending this cookie makes message ids unique per hive @@ -111,41 +116,25 @@ ;;; Messages ;;; ======== - -;; @@: We may want to add a deferred-reply to the below, similar to -;; what we had in XUDD, for actors which do their own response -;; queueing.... ie, that might receive messages but need to shelve -;; them to be acted upon after something else is taken care of. - (define-record-type (make-message-intern id to from action - body in-reply-to wants-reply - replied) + body in-reply-to wants-reply) message? - (id message-id) - (to message-to) - (from message-from) - (action message-action) - (body message-body) - (in-reply-to message-in-reply-to) - (wants-reply message-wants-reply) - (replied message-replied set-message-replied!)) + ;; @@: message-ids are removed. They could be re-enabled + ;; if we had thread-safe promises... + (id message-id) ; id of this message + (to message-to) ; actor id this is going to + (from message-from) ; actor id of sender + (action message-action) ; action (a symbol) to be handled + (body message-body) ; argument list "body" of message + (in-reply-to message-in-reply-to) ; message id this is in reply to, if any + (wants-reply message-wants-reply)) ; whether caller is waiting for reply (define* (make-message id to from action body - #:key in-reply-to wants-reply - replied) + #:key in-reply-to wants-reply) (make-message-intern id to from action body - in-reply-to wants-reply replied)) - -(define (message-auto-reply? message) - (eq? (message-action message) '*auto-reply*)) - -(define (message-needs-reply? message) - "See if this message needs a reply still" - (and (message-wants-reply message) - (not (message-replied message)))) - + in-reply-to wants-reply)) (define (kwarg-list-to-alist args) (let loop ((remaining args) @@ -164,64 +153,57 @@ ;;; See: https://web.archive.org/web/20081223021934/http://mumble.net/~jar/articles/oo-moon-weinreb.html ;;; (also worth seeing: http://mumble.net/~jar/articles/oo.html ) -(define (<- from-actor to-id action . message-body-args) - "Send a message from an actor to another actor" - (let* ((hive (actor-hive from-actor)) - (message (make-message (hive-gen-message-id hive) to-id - (actor-id from-actor) action - message-body-args))) - (8sync (hive-process-message hive message)))) - -(define (<-wait from-actor to-id action . message-body-args) - "Send a message from an actor to another, but wait until we get a response" - (let* ((hive (actor-hive from-actor)) - (abort-to (hive-prompt (actor-hive from-actor))) - (message (make-message (hive-gen-message-id hive) to-id - (actor-id from-actor) action - message-body-args - #:wants-reply #t))) - (abort-to-prompt abort-to from-actor message))) - -;; TODO: Intelligently ~propagate(ish) errors on -wait functions. -;; We might have `send-message-wait-brazen' to allow callers to -;; not have an exception thrown and instead just have a message with -;; the appropriate '*error* message returned. - -(define (<-reply from-actor original-message . message-body-args) - "Reply to a message" - (set-message-replied! original-message #t) - (let* ((hive (actor-hive from-actor)) - (new-message (make-message (hive-gen-message-id hive) - (message-from original-message) - (actor-id from-actor) '*reply* - message-body-args - #:in-reply-to (message-id original-message)))) - (8sync (hive-process-message hive new-message)))) - -(define (<-auto-reply from-actor original-message) - "Auto-reply to a message. Internal use only!" - (set-message-replied! original-message #t) - (let* ((hive (actor-hive from-actor)) - (new-message (make-message (hive-gen-message-id hive) - (message-from original-message) - (actor-id from-actor) '*auto-reply* - '() - #:in-reply-to (message-id original-message)))) - (8sync (hive-process-message hive new-message)))) - -(define (<-reply-wait from-actor original-message . message-body-args) - "Reply to a messsage, but wait until we get a response" - (set-message-replied! original-message #t) - (let* ((hive (actor-hive from-actor)) - (abort-to (hive-prompt (actor-hive from-actor))) - (new-message (make-message (hive-gen-message-id hive) - (message-from original-message) - (actor-id from-actor) '*reply* - message-body-args - #:wants-reply #t - #:in-reply-to (message-id original-message)))) - (abort-to-prompt abort-to from-actor new-message))) - +;; This is the internal, generalized message sending method. +;; Users shouldn't use it! Use the <-foo forms instead. + +(define-inlinable (%<- wants-reply from-actor to action args message-id in-reply-to) + ;; Okay, we need to deal with message ids. + ;; Could we get rid of them? :\ + ;; It seems if we can use eq? and have messages be immutable then + ;; it should be possible to identify follow-up replies. + ;; If we need to track replies across hive boundaries we could + ;; register unique ids across the ambassador barrier. + (match to + (#(_ _ (? channel? channel) dead?) + (let ((message (make-message message-id to + (and from-actor (actor-id from-actor)) + action args + #:wants-reply wants-reply + #:in-reply-to in-reply-to))) + (perform-operation + (choice-operation + (put-operation channel message) + (wait-operation dead?))))) + ;; TODO: put remote addresses here. + (#(actor-id hive-id #f #f) + ;; Here we'd make a call to our hive... + 'TODO) + ;; A message sent to nobody goes nowhere. + ;; TODO: Should we display a warning here, probably? + (#f #f))) + +(define (<- to action . args) + (define from-actor (*current-actor*)) + (%<- #f from-actor to action args + (or (and from-actor + ((actor-msg-id-generator from-actor))) + (big-random-number-string)) + #f)) + +;; TODO: this should abort to the prompt, then check for errors +;; when resuming. + +(define (<-wait to action . args) + (define prompt (*actor-prompt*)) + (when (not prompt) + (error "Tried to <-wait without being in an actor's context...")) + + (let ((reply (abort-to-prompt prompt '<-wait to action args))) + (cond ((eq? action '*error*) + (throw 'hive-unresumable-coroutine + "Won't resume coroutine; got an *error* as a reply" + #:message reply)) + (else (apply values (message-body reply)))))) ;;; Main actor implementation @@ -229,515 +211,473 @@ (define (actor-inheritable-message-handler actor message) (define action (message-action message)) - (define (find-message-handler return) - (for-each (lambda (this-class) - (define actions - (or (and (class-slot-definition this-class 'actions) - (class-slot-ref this-class 'actions)) - '())) - (for-each (match-lambda - ((action-name . method) - (when (eq? action-name action) - (return method)))) - actions)) - (class-precedence-list (class-of actor))) + (define method + (class-rmeta-ref (class-of actor) 'actions action + #:equals? eq? #:cache-set! hashq-set! + #:cache-ref hashq-ref)) + (unless method (throw 'action-not-found "No appropriate action handler found for actor" #:action action #:actor actor #:message message)) - (define method - (call/ec find-message-handler)) (apply method actor message (message-body message))) +(define-syntax-rule (wrap body) + "Wrap possibly multi-value function in a procedure, applies all arguments" + (lambda args + (apply body args))) + +(define-syntax-rule (build-actions (symbol method) ...) + "Construct an alist of (symbol . method), where the method is wrapped +with `wrap' to facilitate live hacking and allow the method definition +to come after class definition." + (build-rmeta-slot + (list (cons (quote symbol) + (wrap method)) ...))) + (define-class () - ;; An address object - (id #:init-keyword #:id + ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?) + ;; - inbox-channel is the receiving channel (as opposed to actor-inbox-deq) + ;; - dead? is a fibers condition variable which is set once this actor + ;; kicks the bucket + (id #:init-keyword #:address #:getter actor-id) - ;; The hive we're connected to. - ;; We need this to be able to send messages. - (hive #:init-keyword #:hive - #:accessor actor-hive) + + ;; Our queue to send/receive messages on + (inbox-deq #:init-thunk make-channel + #:accessor actor-inbox-deq) + + (msg-id-generator #:init-thunk simple-message-id-generator + #:getter actor-msg-id-generator) + ;; How we receive and process new messages (message-handler #:init-value actor-inheritable-message-handler ;; @@: There's no reason not to use #:class instead of ;; #:each-subclass anywhere in this file, except for ;; Guile bug #25211 (#:class is broken in Guile 2.2) - #:allocation #:each-subclass) + #:allocation #:each-subclass + #:getter actor-message-handler) + + ;; valid values are: + ;; - #t as in, send the init message, but don't wait (default) + ;; - 'wait, as in wait on the init message + ;; - #f as in don't bother to init + (should-init #:init-value #t + #:getter actor-should-init + #:allocation #:each-subclass) ;; This is the default, "simple" way to inherit and process messages. - (actions #:init-value '() + (actions #:init-thunk (build-actions) #:allocation #:each-subclass)) -(define-method (actor-message-handler (actor )) - (slot-ref actor 'message-handler)) - -;;; So these are the nicer representations of addresses. -;;; However, they don't serialize so easily with scheme read/write, so we're -;;; using the simpler cons cell version below for now. - -;; (define-record-type
-;; (make-address actor-id hive-id) ; @@: Do we want the trailing -id? -;; address? -;; (actor-id address-actor-id) -;; (hive-id address-hive-id)) -;; -;; (set-record-type-printer! -;;
-;; (lambda (record port) -;; (format port "" -;; (address-actor-id record) (address-hive-id record)))) -;; - -(define (make-address actor-id hive-id) - (cons actor-id hive-id)) +;;; Actors may specify an "init" action that occurs before the actor +;;; actually begins to run. +;;; During actor-init!, an actor may send a message to itself or others +;;; via <- but *may not* use <-wait. +(define-method (actor-init! (actor )) + 'no-op) + +(define-method (actor-cleanup! (actor )) + 'no-op) + +;;; Addresses are vectors where the first part is the actor-id and +;;; the second part is the hive-id. This works well enough... they +;;; look decent being pretty-printed. + +(define (make-address actor-id hive-id channel dead?) + (vector actor-id hive-id channel dead?)) (define (address-actor-id address) - (car address)) + (vector-ref address 0)) (define (address-hive-id address) - (cdr address)) + (vector-ref address 1)) + +(define (address-channel address) + (vector-ref address 2)) + +(define (address-dead? address) + (vector-ref address 3)) (define (address->string address) (string-append (address-actor-id address) "@" (address-hive-id address))) -(define-method (actor-id-actor (actor )) +(define (address-equal? address1 address2) + "Check whether or not the two addresses are equal. + +This compares the actor-id and hive-id but ignores the channel and +dead? condition." + (match address1 + (#(actor-id-1 hive-id-1 _ _) + (match address2 + (#(actor-id-2 hive-id-2) + (and (equal? actor-id-1 actor-id-2) + (and (equal? hive-id-1 hive-id-2)))) + (_ #f))) + (_ #f))) + +(define (actor-id-actor actor) "Get the actor id component of the actor-id" (address-actor-id (actor-id actor))) -(define-method (actor-id-hive (actor )) +(define (actor-id-hive actor) "Get the hive id component of the actor-id" (address-hive-id (actor-id actor))) -(define-method (actor-id-string (actor )) +(define (actor-id-string actor) "Render the full actor id as a human-readable string" (address->string (actor-id actor))) -(define %current-actor +(define (actor-inbox-enq actor) + (address-channel (actor-id actor))) + +(define *current-actor* + (make-parameter #f)) + +(define *actor-prompt* + (make-parameter #f)) + +(define *resume-io-channel* (make-parameter #f)) +(define (actor-main-loop actor) + "Main loop of the actor. Loops around, pulling messages off its queue +and handling them." + ;; @@: Maybe establish some sort of garbage collection routine for these... + (define waiting + (make-hash-table)) + (define message-handler + (actor-message-handler actor)) + (define dead? + (address-dead? (actor-id actor))) + (define prompt (make-prompt-tag (actor-id-actor actor))) + ;; Not always used, only if with-actor-nonblocking-ports is used + (define resume-io-channel + (make-channel)) + + (define (handle-message message) + (catch #t + (lambda () + (call-with-values + (lambda () + (message-handler actor message)) + (lambda vals + ;; Return reply if necessary + (when (message-wants-reply message) + (when (message-wants-reply message) + (%<- #f actor (message-from message) '*reply* + vals ((actor-msg-id-generator actor)) + (message-id message))))))) + (const #t) + (let ((err (current-error-port))) + (lambda (key . args) + (false-if-exception + (let ((stack (make-stack #t 4))) + (format err "Uncaught exception when handling message ~a:\n" + message) + (display-backtrace stack err) + (print-exception err (stack-ref stack 0) + key args) + (newline err) + ;; If the other actor is waiting on a reply, let's let them + ;; know there was an error... + (when (message-wants-reply message) + (%<- #f actor (message-from message) '*error* + (list key) ((actor-msg-id-generator actor)) + (message-id message))))))))) + + (define (resume-handler message) + (define in-reply-to (message-in-reply-to message)) + (cond + ((hash-ref waiting in-reply-to) => + (lambda (kont) + (hash-remove! waiting in-reply-to) + (kont message))) + (else + (format (current-error-port) + "Tried to resume nonexistant message: ~a\n" + (message-id message))))) + + (define (call-with-actor-prompt thunk) + (call-with-prompt prompt + thunk + ;; Here's where we abort to if we're doing <-wait + ;; @@: maybe use match-lambda if we're going to end up + ;; handling multiple ~commands + (match-lambda* + ((kont '<-wait to action message-args) + (define message-id + ((actor-msg-id-generator actor))) + (hash-set! waiting message-id kont) + (%<- #t actor to action message-args message-id #f)) + ((kont 'run-me proc) + (proc kont))))) + + (define halt-or-handle-message + ;; It would be nice if we could give priorities to certain operations. + ;; halt should always win over getting a message... + (choice-operation + (wrap-operation (wait-operation dead?) + (const #f)) ; halt and return + (wrap-operation (get-operation (actor-inbox-deq actor)) + (lambda (message) + (call-with-actor-prompt + (lambda () + (if (message-in-reply-to message) + ;; resume a continuation which was waiting on a reply + (resume-handler message) + ;; start handling a new message + (handle-message message)))) + #t)) ; loop again + (wrap-operation (get-operation resume-io-channel) + (lambda (thunk) + (call-with-actor-prompt + (lambda () + (thunk))) + #t)))) + + ;; Mutate the parameter; this should be fine since each fiber + ;; runs in its own dynamic state with with-dynamic-state. + ;; See with-dynamic-state discussion in + ;; https://wingolog.org/archives/2017/06/27/growing-fibers + (*current-actor* actor) + (*resume-io-channel* resume-io-channel) + + ;; We temporarily set the *actor-prompt* to #f to make sure that + ;; actor-init! doesn't try to do a <-wait message (and not accidentally use + ;; the parent fiber's *actor-prompt* either.) + (*actor-prompt* #f) + (actor-init! actor) + (*actor-prompt* prompt) + + (let loop () + (and (perform-operation halt-or-handle-message) + (loop)))) + + +;; @@: So in order for this to work, we're going to have to add +;; another channel to actors, which is resumable i/o. We'll have to +;; spawn a fiber that wakes up a thunk on the actor when its port is +;; available. Funky... + +(define (%suspend-io-to-actor wait-for-read/write) + (lambda (port) + (define prompt (*actor-prompt*)) + (define resume-channel (*resume-io-channel*)) + (define (run-at-prompt k) + (spawn-fiber + (lambda () + (wait-for-read/write port) + ;; okay, we're awake again, tell the actor to resume this + ;; continuation + (put-message resume-channel k)) + #:parallel? #f)) + (when (not prompt) + (error "Attempt to abort to actor prompt outside of actor")) + (abort-to-prompt (*actor-prompt*) + 'run-me run-at-prompt))) + +(define suspend-read-to-actor + (%suspend-io-to-actor (@@ (fibers) wait-for-readable))) + +(define suspend-write-to-actor + (%suspend-io-to-actor (@@ (fibers) wait-for-writable))) + +(define (with-actor-nonblocking-ports thunk) + "Runs THUNK in dynamic context in which attempting to read/write +from a port that would otherwise block an actor's correspondence with +other actors (note that reading from a nonblocking port should never +block other fibers) will instead permit reading other messages while +I/O is waiting to complete. + +Note that currently " + (parameterize ((current-read-waiter suspend-read-to-actor) + (current-write-waiter suspend-write-to-actor)) + (thunk))) + +(define (actor-spawn-fiber thunk . args) + "Spawn a fiber from an actor but unset actor-machinery-specific +dynamic context." + (apply spawn-fiber + (lambda () + (*current-actor* #f) + (*resume-io-channel* #f) + (*actor-prompt* #f) + (thunk)) + args)) + ;;; Actor utilities ;;; =============== -(define-syntax-rule (build-actions (symbol method) ...) - "Construct an alist of (symbol . method), where the method is wrapped -with wrap-apply to facilitate live hacking and allow the method definition -to come after class definition." - (list - (cons (quote symbol) - (wrap-apply method)) ...)) - -(define-syntax-rule (define-simple-actor class action ...) - (define-class class () - (actions #:init-value (build-actions action ...) - #:allocation #:each-subclass))) +(define-syntax-rule (define-actor class inherits + (action ...) + slots ...) + (define-class class inherits + (actions #:init-thunk (build-actions action ...) + #:allocation #:each-subclass) + slots ...)) ;;; The Hive ;;; ======== -;;; Every actor has a hive. The hive is a kind of "meta-actor" -;;; which routes all the rest of the actors in a system. +;;; Every actor has a hive, which keeps track of other actors, manages +;;; cleanup, and performs inter-hive communication. -(define-generic hive-handle-failed-forward) - -(define-class () +;; TODO: Make this a srfi-9 record type +(define-class () + (id #:init-keyword #:id + #:getter hive-id) (actor-registry #:init-thunk make-hash-table #:getter hive-actor-registry) - (msg-id-generator #:init-thunk simple-message-id-generator - #:getter hive-msg-id-generator) + ;; TODO: Rename "ambassadors" to "relays" ;; Ambassadors are used (or will be) for inter-hive communication. - ;; These are special actors that know how to route messages to other hives. + ;; These are special actors that know how to route messages to other + ;; hives. (ambassadors #:init-thunk make-weak-key-hash-table #:getter hive-ambassadors) - ;; Waiting coroutines - ;; This is a map from cons cell of message-id - ;; to a cons cell of (actor-id . coroutine) - ;; @@: Should we have a record type? - ;; @@: Should there be any way to clear out "old" coroutines? - (waiting-coroutines #:init-thunk make-hash-table - #:getter hive-waiting-coroutines) - ;; Message prompt - ;; When actors send messages to each other they abort to this prompt - ;; to send the message, then carry on their way - (prompt #:init-thunk make-prompt-tag - #:getter hive-prompt) - (actions #:allocation #:each-subclass - #:init-value - (build-actions - ;; This is in the case of an ambassador failing to forward a - ;; message... it reports it back to the hive - (*failed-forward* hive-handle-failed-forward)))) - -(define-method (hive-handle-failed-forward (hive ) message) - "Handle an ambassador failing to forward a message" - 'TODO) + (channel #:init-thunk make-channel + #:getter hive-channel) + (halt? #:init-thunk make-condition + #:getter hive-halt?)) (define* (make-hive #:key hive-id) - (let ((hive (make - #:id (make-address - "hive" (or hive-id - (big-random-number-string)))))) - ;; Set the hive's actor reference to itself - (set! (actor-hive hive) hive) - hive)) - -(define-method (hive-id (hive )) - (actor-id-hive hive)) - -(define-method (hive-gen-actor-id (hive ) cookie) - (make-address (if cookie - (string-append cookie "-" (big-random-number-string)) - (big-random-number-string)) - (hive-id hive))) - -(define-method (hive-gen-message-id (hive )) - "Generate a message id using HIVE's message id generator" - ((hive-msg-id-generator hive))) - -(define-method (hive-resolve-local-actor (hive ) actor-address) - (hash-ref (hive-actor-registry hive) actor-address)) - -(define-method (hive-resolve-ambassador (hive ) ambassador-address) - (hash-ref (hive-ambassadors hive) ambassador-address)) - -(define-method (make-forward-request (hive ) (ambassador ) message) - (make-message (hive-gen-message-id hive) (actor-id ambassador) - ;; If we make the hive not an actor, we could either switch this - ;; to #f or to the original actor...? - ;; Maybe some more thinking should be done on what should - ;; happen in case of failure to forward? Handling ambassador failures - ;; seems like the primary motivation for the hive remaining an actor. - (actor-id hive) - '*forward* - `((original . ,message)))) - -(define-method (hive-reply-with-error (hive ) original-message - error-key error-args) - ;; We only supply the error-args if the original sender is on the same hive - (define (orig-actor-on-same-hive?) - (equal? (hive-id hive) - (address-hive-id (message-from original-message)))) - (set-message-replied! original-message #t) - (let* ((new-message-body - (if (orig-actor-on-same-hive?) - `(#:original-message ,original-message - #:error-key ,error-key - #:error-args ,error-args) - `(#:original-message ,original-message - #:error-key ,error-key))) - (new-message (make-message (hive-gen-message-id hive) - (message-from original-message) - (actor-id hive) '*error* - new-message-body - #:in-reply-to (message-id original-message)))) - ;; We only return a thunk, rather than run 8sync here, because if - ;; we ran 8sync in the middle of a catch we'd end up with an - ;; unresumable continuation. - (lambda () (hive-process-message hive new-message)))) - -(define-method (hive-process-message (hive ) message) - "Handle one message, or forward it via an ambassador" - (define (maybe-autoreply actor) - ;; Possibly autoreply - (if (message-needs-reply? message) - (<-auto-reply actor message))) - - (define (resolve-actor-to) - "Get the actor the message was aimed at" - (let ((actor (hive-resolve-local-actor hive (message-to message)))) - (if (not actor) - (throw 'actor-not-found - (format #f "Message ~a from ~a directed to nonexistant actor ~a" - (message-id message) - (address->string (message-from message)) - (address->string (message-to message))) - message)) - actor)) - - (define (call-catching-coroutine thunk) - (define queued-error-handling-thunk #f) - (define (call-catching-errors) - ;; TODO: maybe parameterize (or attach to hive) and use - ;; maybe-catch-all from agenda.scm - ;; @@: Why not just use with-throw-handler and let the catch - ;; happen at the agenda? That's what we used to do, but - ;; it ended up with a SIGABRT. See: - ;; http://lists.gnu.org/archive/html/bug-guile/2016-05/msg00003.html - (catch #t - thunk - ;; In the actor model, we don't totally crash on errors. - (lambda _ #f) - ;; If an error happens, we raise it - (lambda (key . args) - (if (message-needs-reply? message) - ;; If the message is waiting on a reply, let them know - ;; something went wrong. - ;; However, we have to do it outside of this catch - ;; routine, or we'll end up in an unrewindable continuation - ;; situation. - (set! queued-error-handling-thunk - (hive-reply-with-error hive message key args))) - ;; print error message - (apply print-error-and-continue key args))) - ;; @@: This is a kludge. See above for why. - (if queued-error-handling-thunk - (8sync (queued-error-handling-thunk)))) - (call-with-prompt (hive-prompt hive) - call-catching-errors - (lambda (kont actor message) - ;; Register the coroutine - (hash-set! (hive-waiting-coroutines hive) - (message-id message) - (cons (actor-id actor) kont)) - ;; Send off the message - (8sync (hive-process-message hive message))))) - - (define (process-local-message) - (let ((actor (resolve-actor-to))) - (call-catching-coroutine - (lambda () - (define message-handler (actor-message-handler actor)) - ;; @@: Should a more general error handling happen here? - (parameterize ((%current-actor actor)) - (let ((result - (message-handler actor message))) - (maybe-autoreply actor) - ;; Returning result allows actors to possibly make a run-request - ;; at the end of handling a message. - ;; ... We do want that, right? - result)))))) - - (define (resume-waiting-coroutine) - (cond - ((or (eq? (message-action message) '*reply*) - (eq? (message-action message) '*auto-reply*)) - (call-catching-coroutine - (lambda () - (match (hash-remove! (hive-waiting-coroutines hive) - (message-in-reply-to message)) - ((_ . (resume-actor-id . kont)) - (if (not (equal? (message-to message) - resume-actor-id)) - (throw 'resuming-to-wrong-actor - "Attempted to resume a coroutine to the wrong actor!" - #:expected-actor-id (message-to message) - #:got-actor-id resume-actor-id - #:message message)) - (let (;; @@: How should we resolve resuming coroutines to actors who are - ;; now gone? - (actor (resolve-actor-to)) - (result (kont message))) - (maybe-autoreply actor) - result)) - (#f (throw 'no-waiting-coroutine - "message in-reply-to tries to resume nonexistent coroutine" - message)))))) - ;; Yikes, we must have gotten an error or something back - (else - ;; @@: Not what we want in the long run? - ;; What we'd *prefer* to do is to resume this message - ;; and throw an error inside the message handler - ;; (say, from send-mesage-wait), but that causes a SIGABRT (??!!) - (hash-remove! (hive-waiting-coroutines hive) - (message-in-reply-to message)) - (let ((explaination - (if (eq? (message-action message) '*reply*) - "Won't resume coroutine; got an *error* as a reply" - "Won't resume coroutine because action is not *reply*"))) - (throw 'hive-unresumable-coroutine - explaination - #:message message))))) - - (define (process-remote-message) - ;; Find the ambassador - (let* ((remote-hive-id (hive-id (message-to message))) - (ambassador (hive-resolve-ambassador remote-hive-id)) - (message-handler (actor-message-handler ambassador)) - (forward-request (make-forward-request hive ambassador message))) - (message-handler ambassador forward-request))) - - (let ((to (message-to message))) - ;; This seems to be an easy mistake to make, so check that addressing - ;; is correct here - (if (not to) - (throw 'missing-addressee - "`to' field is missing on message" - #:message message)) - (if (hive-actor-local? hive to) - (if (message-in-reply-to message) - (resume-waiting-coroutine) - (process-local-message)) - (process-remote-message)))) - -(define-method (hive-actor-local? (hive ) address) - (equal? (hive-id hive) (address-hive-id address))) - -(define-method (hive-register-actor! (hive ) (actor )) - (hash-set! (hive-actor-registry hive) (actor-id actor) actor)) - -(define-method (%hive-create-actor (hive ) actor-class - init id-cookie) - "Actual method called by hive-create-actor. - -Since this is a define-method it can't accept fancy define* arguments, -so this gets called from the nicer hive-create-actor interface. See -that method for documentation." - (let* ((actor-id (hive-gen-actor-id hive id-cookie)) + (make #:id (or hive-id + (big-random-number-string)))) + +(define (gen-actor-id cookie) + (if cookie + (string-append cookie ":" (big-random-number-string)) + (big-random-number-string))) + +(define (hive-main-loop hive) + "The main loop of the hive. This listens for messages on the hive-channel +for certain actions to perform. + +`messages' here is not the same as a object; these are a list of +values, the first value being a symbol" + (define channel (hive-channel hive)) + (define halt? (hive-halt? hive)) + (define registry (hive-actor-registry hive)) + + ;; not the same as a ;P + (define handle-message + (match-lambda + (('register-actor actor-id address actor) + (hash-set! registry actor-id (vector address actor))) + ;; Remove the actor from hive + (('remove-actor actor-id) + (hash-remove! (hive-actor-registry hive) actor-id)) + (('register-ambassador hive-id ambassador-actor-id) + 'TODO) + (('unregister-ambassador hive-id ambassador-actor-id) + 'TODO) + (('forward-message from-actor-id message) + 'TODO))) + + (define halt-or-handle + (choice-operation + (wrap-operation (get-operation channel) + (lambda (msg) + (handle-message msg) + #t)) + (wrap-operation (wait-operation halt?) + (const #f)))) + + (let lp () + (and (perform-operation halt-or-handle) + (lp)))) + +(define *hive-id* (make-parameter #f)) +(define *hive-channel* (make-parameter #f)) + +;; @@: Should we halt the hive either at the end of spawn-hive or run-hive? +(define* (spawn-hive proc #:key (hive (make-hive))) + "Spawn a hive and run PROC, passing it the fresh hive and establishing +a dynamic context surrounding the hive." + (spawn-fiber (lambda () (hive-main-loop hive))) + (parameterize ((*hive-id* (hive-id hive)) + (*hive-channel* (hive-channel hive))) + (proc hive))) + +(define (run-hive proc . args) + "Spawn a hive and run it in run-fibers. Takes a PROC as would be passed +to spawn-hive... all remaining arguments passed to run-fibers." + (apply run-fibers + (lambda () + (spawn-hive proc)) + args)) + +(define (%create-actor actor-class init-args id-cookie send-init?) + (let* ((hive-channel (*hive-channel*)) + (hive-id (*hive-id*)) + (actor-id (gen-actor-id id-cookie)) + (dead? (make-condition)) + (inbox-enq (make-channel)) + (address (make-address actor-id hive-id + inbox-enq dead?)) (actor (apply make actor-class - #:hive hive - #:id actor-id - init))) - (hive-register-actor! hive actor) - ;; return the actor id - actor-id)) - -(define* (hive-create-actor hive actor-class #:rest init) - (%hive-create-actor hive actor-class - init #f)) - -(define* (hive-create-actor* hive actor-class id-cookie #:rest init) - (%hive-create-actor hive actor-class - init id-cookie)) - -(define (call-with-message message proc) - "Applies message body arguments into procedure, with message as first -argument. Similar to call-with-values in concept." - (apply proc message (message-body message))) - -;; (msg-receive (<- bar baz) -;; (baz) -;; basil) - -;; Emacs: (put 'msg-receive 'scheme-indent-function 2) - -;; @@: Or receive-msg or receieve-message or?? -(define-syntax-rule (msg-receive arglist message body ...) - "Call body with arglist (which can accept arguments like lambda*) -applied from the message-body of message." - (call-with-message message - (lambda* arglist - body ...))) - -(define (msg-val message) - "Retrieve the first value from the message-body of message. -Like single value return from a procedure call. Probably the most -common case when waiting on a reply from some action invocation." - (call-with-message message - (lambda (_ val) val))) - - -;;; Various API methods for actors to interact with the system -;;; ========================================================== - -;; TODO: move send-message and friends here...? - -(define* (create-actor from-actor actor-class #:rest init) + #:address address + init-args)) + (should-init (actor-should-init actor))) + + ;; start the main loop + (spawn-fiber (lambda () + ;; start the inbox loop + (spawn-fiber + (lambda () + (delivery-agent inbox-enq (actor-inbox-deq actor) + dead?)) + ;; this one is decidedly non-parallel, because we want + ;; the delivery agent to be in the same thread as its actor + #:parallel? #f) + + (actor-main-loop actor)) + #:parallel? #t) + + (put-message hive-channel (list 'register-actor actor-id address actor)) + + ;; return the address + address)) + +(define* (create-actor actor-class #:rest init-args) "Create an instance of actor-class. Return the new actor's id. This is the method actors should call directly (unless they want to supply an id-cookie, in which case they should use create-actor*)." - (%hive-create-actor (actor-hive from-actor) actor-class - init #f)) + (%create-actor actor-class init-args #f #t)) -(define* (create-actor* from-actor actor-class id-cookie #:rest init) +(define* (create-actor* actor-class id-cookie #:rest init-args) "Create an instance of actor-class. Return the new actor's id. Like create-actor, but permits supplying an id-cookie." - (%hive-create-actor (actor-hive from-actor) actor-class - init id-cookie)) - - -(define (self-destruct actor) - "Remove an actor from the hive." - (hash-remove! (hive-actor-registry (actor-hive actor)) - (actor-id actor))) - - - -;;; 8sync bootstrap utilities -;;; ========================= - -(define* (ez-run-hive hive initial-tasks #:key repl-server) - "Start up an agenda and run HIVE in it with INITIAL-TASKS. - -Should we start up a cooperative REPL for live hacking? REPL-SERVER -wants to know! You can pass it #t or #f, or if you want to specify a port, -an integer." - (let* ((queue (list->q initial-tasks)) - (agenda (make-agenda #:pre-unwind-handler print-error-and-continue - #:queue queue))) - (cond - ;; If repl-server is an integer, we'll use that as the port - ((integer? repl-server) - (spawn-and-queue-repl-server! agenda repl-server)) - (repl-server - (spawn-and-queue-repl-server! agenda))) - (start-agenda agenda))) - -(define (bootstrap-message hive to-id action . message-body-args) - (wrap - (apply <- hive to-id action message-body-args))) - - - -;;; Basic readers / writers -;;; ======================= - -(define (serialize-message message) - "Serialize a message for read/write" - (list - (message-id message) - (message-to message) - (message-from message) - (message-action message) - (message-body message) - (message-in-reply-to message) - (message-wants-reply message) - (message-replied message))) - -(define* (write-message message #:optional (port (current-output-port))) - "Write out a message to a port for easy reading later. - -Note that if a sub-value can't be easily written to something -Guile's `read' procedure knows how to read, this doesn't do anything -to improve that. You'll need a better serializer for that.." - (write (serialize-message message) port)) - -(define (serialize-message-pretty message) - "Serialize a message in a way that's easy for humans to read." - `(*message* - (id ,(message-id message)) - (to ,(message-to message)) - (from ,(message-from message)) - (action ,(message-action message)) - (body ,(message-body message)) - (in-reply-to ,(message-in-reply-to message)) - (wants-reply ,(message-wants-reply message)) - (replied ,(message-replied message)))) - -(define (pprint-message message) - "Pretty print a message." - (pretty-print (serialize-message-pretty message))) - -(define* (read-message #:optional (port (current-input-port))) - "Read a message serialized via serialize-message from PORT" - (match (read port) - ((id to from action body in-reply-to wants-reply replied) - (make-message-intern - id to from action body - in-reply-to wants-reply replied)) - (anything-else - (throw 'message-read-bad-structure - "Could not read message from structure" - anything-else)))) - -(define (read-message-from-string message-str) - "Read message from MESSAGE-STR" - (with-input-from-string message-str - (lambda () - (read-message (current-input-port))))) + (%create-actor actor-class init-args id-cookie #t)) + +(define* (self-destruct actor #:key (cleanup #t)) + "Remove an actor from the hive. + +Unless #:cleanup is set to #f, this will first have the actor handle +its '*cleanup* action handler." + (signal-condition! (address-dead? (actor-id actor))) + (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor))) + ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending + ;; a message with <-wait + (*actor-prompt* #f) + (actor-cleanup! actor)) + +;; From a patch I sent to Fibers... +(define (condition-signalled? cvar) + "Return @code{#t} if @var{cvar} has already been signalled. + +In general you will want to use @code{wait} or @code{wait-operation} to +wait on a condition. However, sometimes it is useful to see whether or +not a condition has already been signalled without blocking." + (atomic-box-ref ((@@ (fibers conditions) condition-signalled?) cvar))) + +(define (actor-alive? actor) + (condition-signalled? (address-dead? (actor-id actor))))