X-Git-Url: https://jxself.org/git/?p=8sync.git;a=blobdiff_plain;f=8sync%2Factors.scm;h=62a07ac113e15d23b87c3cce0e6cf58c2856b8ae;hp=1b120ba77916e1e3f9bd08029cfa4398218a5e66;hb=d50c1349a0f5abc0e110400f0c3315a1809bf813;hpb=d23b593a5810b38d2517a44c09d49b2835c59e16 diff --git a/8sync/actors.scm b/8sync/actors.scm index 1b120ba..62a07ac 100644 --- a/8sync/actors.scm +++ b/8sync/actors.scm @@ -19,27 +19,34 @@ (define-module (8sync actors) #:use-module (oop goops) #:use-module (srfi srfi-9) - #:use-module (srfi srfi-9 gnu) #:use-module (ice-9 control) #:use-module (ice-9 format) #:use-module (ice-9 match) + #:use-module (ice-9 atomic) + #:use-module ((ice-9 ports internal) + #:select (port-read-wait-fd port-write-wait-fd)) #:use-module (ice-9 pretty-print) - #:use-module (8sync agenda) + #:use-module (ice-9 receive) + #:use-module (ice-9 suspendable-ports) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers conditions) + #:use-module (fibers operations) + #:use-module (8sync inbox) + #:use-module (8sync rmeta-slot) + #:export (;; utilities... ought to go in their own module big-random-number big-random-number-string - simple-message-id-generator actor-id actor-message-handler - %current-actor - ;;; Commenting out the
type for now; ;;; it may be back when we have better serializers ;;
- make-address address? + make-address address-actor-id address-hive-id address->string @@ -47,19 +54,22 @@ actor-id-hive actor-id-string + actor-init! actor-cleanup! + actor-alive? build-actions define-actor - - make-hive - ;; There are more methods for the hive, but there's - ;; no reason for the outside world to look at them maybe? - hive-id - bootstrap-actor bootstrap-actor* + actor-spawn-fiber + with-actor-nonblocking-ports + ;; + ;; make-hive + ;; ;; There are more methods for the hive, but there's + ;; ;; no reason for the outside world to look at them maybe? + ;; hive-id create-actor create-actor* self-destruct @@ -69,35 +79,29 @@ message-id message-body message-in-reply-to message-wants-reply - message-auto-reply? - - <- <-* <-wait <-wait* <-reply <-reply* <-reply-wait <-reply-wait* + <- <-wait - call-with-message mbody-receive mbody-val + spawn-hive run-hive - run-hive - bootstrap-message - - serialize-message write-message - serialize-message-pretty pprint-message - read-message read-message-from-string)) + ;; Maybe the wrong place for this, or for it to be exported. + ;; But it's used in websockets' server implementation at least... + wrap)) ;; For ids -(define %random-state - (make-parameter (random-state-from-platform))) +(set! *random-state* (random-state-from-platform)) ;; Same size as a uuid4 I think... (define random-number-size (expt 2 128)) (define (big-random-number) - (random random-number-size (%random-state))) + (random random-number-size)) ;; Would be great to get this base64 encoded instead. (define (big-random-number-string) ;; @@: This is slow. Using format here is wasteful. (format #f "~x" (big-random-number))) -;; @@: This is slow. A mere ~275k / second on my (old) machine. +;; @@: This is slow-ish. A mere ~275k / second on my (old) machine. ;; The main cost seems to be in number->string. (define (simple-message-id-generator) ;; Prepending this cookie makes message ids unique per hive @@ -112,42 +116,25 @@ ;;; Messages ;;; ======== - -;; @@: We may want to add a deferred-reply to the below, similar to -;; what we had in XUDD, for actors which do their own response -;; queueing.... ie, that might receive messages but need to shelve -;; them to be acted upon after something else is taken care of. - (define-record-type (make-message-intern id to from action - body in-reply-to wants-reply - replied) + body in-reply-to wants-reply) message? + ;; @@: message-ids are removed. They could be re-enabled + ;; if we had thread-safe promises... (id message-id) ; id of this message (to message-to) ; actor id this is going to (from message-from) ; actor id of sender (action message-action) ; action (a symbol) to be handled (body message-body) ; argument list "body" of message (in-reply-to message-in-reply-to) ; message id this is in reply to, if any - (wants-reply message-wants-reply) ; whether caller is waiting for reply - (replied message-replied ; was this message replied to? - set-message-replied!)) + (wants-reply message-wants-reply)) ; whether caller is waiting for reply (define* (make-message id to from action body - #:key in-reply-to wants-reply - replied) + #:key in-reply-to wants-reply) (make-message-intern id to from action body - in-reply-to wants-reply replied)) - -(define (message-auto-reply? message) - (eq? (message-action message) '*auto-reply*)) - -(define (message-needs-reply? message) - "See if this message needs a reply still" - (and (message-wants-reply message) - (not (message-replied message)))) - + in-reply-to wants-reply)) (define (kwarg-list-to-alist args) (let loop ((remaining args) @@ -169,124 +156,54 @@ ;; This is the internal, generalized message sending method. ;; Users shouldn't use it! Use the <-foo forms instead. -;; @@: Could we get rid of some of the conditional checks through -;; some macro-foo? - -(define-inlinable (send-message send-options from-actor to-id action - replying-to-message wants-reply? - message-body-args) - (if replying-to-message - (set-message-replied! replying-to-message #t)) - (let* ((hive (actor-hive from-actor)) - (new-message - (make-message (hive-gen-message-id hive) to-id - (actor-id from-actor) action - message-body-args - #:wants-reply wants-reply? - #:in-reply-to - (if replying-to-message - (message-id replying-to-message) - #f)))) - (if wants-reply? - (abort-to-prompt (hive-prompt (actor-hive from-actor)) - from-actor new-message send-options) - ;; @@: It might be that eventually we pass in send-options - ;; here too. Since <-wait and <-reply-wait are the only ones - ;; that use it yet, for now it kind of just makes things - ;; confusing. - (8sync (hive-process-message hive new-message))))) - -(define (<- to-id action . message-body-args) - "Send a message from an actor to another actor" - (send-message '() (%current-actor) to-id action - #f #f message-body-args)) - -(define (<-* send-options to-id action . message-body-args) - "Like <-*, but allows extra parameters via send-options" - (define* (really-send #:key (actor (%current-actor)) - #:allow-other-keys) - (send-message send-options actor to-id action - #f #f message-body-args)) - (apply really-send send-options)) - -(define (<-wait to-id action . message-body-args) - "Send a message from an actor to another, but wait until we get a response" - (wait-maybe-handle-errors - (send-message '() (%current-actor) to-id action - #f #t message-body-args))) - -(define (<-wait* send-options to-id action . message-body-args) - "Like <-wait, but allows extra parameters, for example whether to -#:accept-errors" - (define* (really-send #:key (actor (%current-actor)) - #:allow-other-keys) - (apply wait-maybe-handle-errors - (send-message send-options actor to-id action - #f #t message-body-args) - send-options)) - (apply really-send send-options)) - -;; TODO: Intelligently ~propagate(ish) errors on -wait functions. -;; We might have `send-message-wait-brazen' to allow callers to -;; not have an exception thrown and instead just have a message with -;; the appropriate '*error* message returned. - -(define (<-reply original-message . message-body-args) - "Reply to a message" - (when (message-needs-reply? original-message) - (send-message '() (%current-actor) (message-from original-message) '*reply* - original-message #f message-body-args))) - -(define (<-reply* send-options original-message . message-body-args) - "Like <-reply, but allows extra parameters via send-options" - (define* (really-send #:key (actor (%current-actor)) - #:allow-other-keys) - (send-message send-options actor - (message-from original-message) '*reply* - original-message #f message-body-args)) - (when (message-needs-reply? original-message) - (apply really-send send-options))) - -(define (<-auto-reply actor original-message) - "Auto-reply to a message. Internal use only!" - (send-message '() actor (message-from original-message) '*auto-reply* - original-message #f '())) - -(define (<-reply-wait original-message . message-body-args) - "Reply to a messsage, but wait until we get a response" - (if (message-needs-reply? original-message) - (wait-maybe-handle-errors - (send-message '() (%current-actor) - (message-from original-message) '*reply* - original-message #t message-body-args)) - #f)) - -(define (<-reply-wait* send-options original-message - . message-body-args) - "Like <-reply-wait, but allows extra parameters via send-options" - (define* (really-send #:key (actor (%current-actor)) - #:allow-other-keys) - (apply wait-maybe-handle-errors - (send-message send-options actor - (message-from original-message) '*reply* - original-message #t message-body-args) - send-options)) - (when (message-needs-reply? original-message) - (apply really-send send-options))) - -(define* (wait-maybe-handle-errors message - #:key accept-errors - #:allow-other-keys) - "Before returning a message to a waiting caller, see if we need to -raise an exception if an error." - (define action (message-action message)) - (cond ((and (eq? action '*error*) - (not accept-errors)) - (throw 'hive-unresumable-coroutine - "Won't resume coroutine; got an *error* as a reply" - #:message message)) - (else message))) - +(define-inlinable (%<- wants-reply from-actor to action args message-id in-reply-to) + ;; Okay, we need to deal with message ids. + ;; Could we get rid of them? :\ + ;; It seems if we can use eq? and have messages be immutable then + ;; it should be possible to identify follow-up replies. + ;; If we need to track replies across hive boundaries we could + ;; register unique ids across the ambassador barrier. + (match to + (#(_ _ (? channel? channel) dead?) + (let ((message (make-message message-id to + (and from-actor (actor-id from-actor)) + action args + #:wants-reply wants-reply + #:in-reply-to in-reply-to))) + (perform-operation + (choice-operation + (put-operation channel message) + (wait-operation dead?))))) + ;; TODO: put remote addresses here. + (#(actor-id hive-id #f #f) + ;; Here we'd make a call to our hive... + 'TODO) + ;; A message sent to nobody goes nowhere. + ;; TODO: Should we display a warning here, probably? + (#f #f))) + +(define (<- to action . args) + (define from-actor (*current-actor*)) + (%<- #f from-actor to action args + (or (and from-actor + ((actor-msg-id-generator from-actor))) + (big-random-number-string)) + #f)) + +;; TODO: this should abort to the prompt, then check for errors +;; when resuming. + +(define (<-wait to action . args) + (define prompt (*actor-prompt*)) + (when (not prompt) + (error "Tried to <-wait without being in an actor's context...")) + + (let ((reply (abort-to-prompt prompt '<-wait to action args))) + (cond ((eq? action '*error*) + (throw 'hive-unresumable-coroutine + "Won't resume coroutine; got an *error* as a reply" + #:message reply)) + (else (apply values (message-body reply)))))) ;;; Main actor implementation @@ -294,43 +211,46 @@ raise an exception if an error." (define (actor-inheritable-message-handler actor message) (define action (message-action message)) - (define (find-message-handler return) - (for-each (lambda (this-class) - (define actions - (or (and (class-slot-definition this-class 'actions) - (class-slot-ref this-class 'actions)) - '())) - (for-each (match-lambda - ((action-name . method) - (when (eq? action-name action) - (return method)))) - actions)) - (class-precedence-list (class-of actor))) + (define method + (class-rmeta-ref (class-of actor) 'actions action + #:equals? eq? #:cache-set! hashq-set! + #:cache-ref hashq-ref)) + (unless method (throw 'action-not-found "No appropriate action handler found for actor" #:action action #:actor actor #:message message)) - (define method - (call/ec find-message-handler)) (apply method actor message (message-body message))) +(define-syntax-rule (wrap body) + "Wrap possibly multi-value function in a procedure, applies all arguments" + (lambda args + (apply body args))) + (define-syntax-rule (build-actions (symbol method) ...) "Construct an alist of (symbol . method), where the method is wrapped -with wrap-apply to facilitate live hacking and allow the method definition +with `wrap' to facilitate live hacking and allow the method definition to come after class definition." - (list - (cons (quote symbol) - (wrap-apply method)) ...)) + (build-rmeta-slot + (list (cons (quote symbol) + (wrap method)) ...))) (define-class () - ;; An address object - (id #:init-keyword #:id + ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?) + ;; - inbox-channel is the receiving channel (as opposed to actor-inbox-deq) + ;; - dead? is a fibers condition variable which is set once this actor + ;; kicks the bucket + (id #:init-keyword #:address #:getter actor-id) - ;; The hive we're connected to. - ;; We need this to be able to send messages. - (hive #:init-keyword #:hive - #:accessor actor-hive) + + ;; Our queue to send/receive messages on + (inbox-deq #:init-thunk make-channel + #:accessor actor-inbox-deq) + + (msg-id-generator #:init-thunk simple-message-id-generator + #:getter actor-msg-id-generator) + ;; How we receive and process new messages (message-handler #:init-value actor-inheritable-message-handler ;; @@: There's no reason not to use #:class instead of @@ -344,22 +264,29 @@ to come after class definition." ;; - 'wait, as in wait on the init message ;; - #f as in don't bother to init (should-init #:init-value #t + #:getter actor-should-init #:allocation #:each-subclass) ;; This is the default, "simple" way to inherit and process messages. - (actions #:init-value (build-actions - ;; Default init method is to do nothing. - (*init* (const #f)) - ;; Default cleanup method is to do nothing. - (*cleanup* (const #f))) + (actions #:init-thunk (build-actions) #:allocation #:each-subclass)) +;;; Actors may specify an "init" action that occurs before the actor +;;; actually begins to run. +;;; During actor-init!, an actor may send a message to itself or others +;;; via <- but *may not* use <-wait. +(define-method (actor-init! (actor )) + 'no-op) + +(define-method (actor-cleanup! (actor )) + 'no-op) + ;;; Addresses are vectors where the first part is the actor-id and ;;; the second part is the hive-id. This works well enough... they ;;; look decent being pretty-printed. -(define (make-address actor-id hive-id) - (vector actor-id hive-id)) +(define (make-address actor-id hive-id channel dead?) + (vector actor-id hive-id channel dead?)) (define (address-actor-id address) (vector-ref address 0)) @@ -367,27 +294,219 @@ to come after class definition." (define (address-hive-id address) (vector-ref address 1)) +(define (address-channel address) + (vector-ref address 2)) + +(define (address-dead? address) + (vector-ref address 3)) + (define (address->string address) (string-append (address-actor-id address) "@" (address-hive-id address))) -(define-method (actor-id-actor (actor )) +(define (address-equal? address1 address2) + "Check whether or not the two addresses are equal. + +This compares the actor-id and hive-id but ignores the channel and +dead? condition." + (match address1 + (#(actor-id-1 hive-id-1 _ _) + (match address2 + (#(actor-id-2 hive-id-2) + (and (equal? actor-id-1 actor-id-2) + (and (equal? hive-id-1 hive-id-2)))) + (_ #f))) + (_ #f))) + +(define (actor-id-actor actor) "Get the actor id component of the actor-id" (address-actor-id (actor-id actor))) -(define-method (actor-id-hive (actor )) +(define (actor-id-hive actor) "Get the hive id component of the actor-id" (address-hive-id (actor-id actor))) -(define-method (actor-id-string (actor )) +(define (actor-id-string actor) "Render the full actor id as a human-readable string" (address->string (actor-id actor))) -(define %current-actor +(define (actor-inbox-enq actor) + (address-channel (actor-id actor))) + +(define *current-actor* (make-parameter #f)) -(define (actor-alive? actor) - (hive-resolve-local-actor (actor-hive actor) (actor-id actor))) +(define *actor-prompt* + (make-parameter #f)) + +(define *resume-io-channel* + (make-parameter #f)) + +(define (actor-main-loop actor) + "Main loop of the actor. Loops around, pulling messages off its queue +and handling them." + ;; @@: Maybe establish some sort of garbage collection routine for these... + (define waiting + (make-hash-table)) + (define message-handler + (actor-message-handler actor)) + (define dead? + (address-dead? (actor-id actor))) + (define prompt (make-prompt-tag (actor-id-actor actor))) + ;; Not always used, only if with-actor-nonblocking-ports is used + (define resume-io-channel + (make-channel)) + + (define (handle-message message) + (catch #t + (lambda () + (call-with-values + (lambda () + (message-handler actor message)) + (lambda vals + ;; Return reply if necessary + (when (message-wants-reply message) + (when (message-wants-reply message) + (%<- #f actor (message-from message) '*reply* + vals ((actor-msg-id-generator actor)) + (message-id message))))))) + (const #t) + (let ((err (current-error-port))) + (lambda (key . args) + (false-if-exception + (let ((stack (make-stack #t 4))) + (format err "Uncaught exception when handling message ~a:\n" + message) + (display-backtrace stack err) + (print-exception err (stack-ref stack 0) + key args) + (newline err) + ;; If the other actor is waiting on a reply, let's let them + ;; know there was an error... + (when (message-wants-reply message) + (%<- #f actor (message-from message) '*error* + (list key) ((actor-msg-id-generator actor)) + (message-id message))))))))) + + (define (resume-handler message) + (define in-reply-to (message-in-reply-to message)) + (cond + ((hash-ref waiting in-reply-to) => + (lambda (kont) + (hash-remove! waiting in-reply-to) + (kont message))) + (else + (format (current-error-port) + "Tried to resume nonexistant message: ~a\n" + (message-id message))))) + + (define (call-with-actor-prompt thunk) + (call-with-prompt prompt + thunk + ;; Here's where we abort to if we're doing <-wait + ;; @@: maybe use match-lambda if we're going to end up + ;; handling multiple ~commands + (match-lambda* + ((kont '<-wait to action message-args) + (define message-id + ((actor-msg-id-generator actor))) + (hash-set! waiting message-id kont) + (%<- #t actor to action message-args message-id #f)) + ((kont 'run-me proc) + (proc kont))))) + + (define halt-or-handle-message + ;; It would be nice if we could give priorities to certain operations. + ;; halt should always win over getting a message... + (choice-operation + (wrap-operation (wait-operation dead?) + (const #f)) ; halt and return + (wrap-operation (get-operation (actor-inbox-deq actor)) + (lambda (message) + (call-with-actor-prompt + (lambda () + (if (message-in-reply-to message) + ;; resume a continuation which was waiting on a reply + (resume-handler message) + ;; start handling a new message + (handle-message message)))) + #t)) ; loop again + (wrap-operation (get-operation resume-io-channel) + (lambda (thunk) + (call-with-actor-prompt + (lambda () + (thunk))) + #t)))) + + ;; Mutate the parameter; this should be fine since each fiber + ;; runs in its own dynamic state with with-dynamic-state. + ;; See with-dynamic-state discussion in + ;; https://wingolog.org/archives/2017/06/27/growing-fibers + (*current-actor* actor) + (*resume-io-channel* resume-io-channel) + + ;; We temporarily set the *actor-prompt* to #f to make sure that + ;; actor-init! doesn't try to do a <-wait message (and not accidentally use + ;; the parent fiber's *actor-prompt* either.) + (*actor-prompt* #f) + (actor-init! actor) + (*actor-prompt* prompt) + + (let loop () + (and (perform-operation halt-or-handle-message) + (loop)))) + + +;; @@: So in order for this to work, we're going to have to add +;; another channel to actors, which is resumable i/o. We'll have to +;; spawn a fiber that wakes up a thunk on the actor when its port is +;; available. Funky... + +(define (%suspend-io-to-actor wait-for-read/write) + (lambda (port) + (define prompt (*actor-prompt*)) + (define resume-channel (*resume-io-channel*)) + (define (run-at-prompt k) + (spawn-fiber + (lambda () + (wait-for-read/write port) + ;; okay, we're awake again, tell the actor to resume this + ;; continuation + (put-message resume-channel k)) + #:parallel? #f)) + (when (not prompt) + (error "Attempt to abort to actor prompt outside of actor")) + (abort-to-prompt (*actor-prompt*) + 'run-me run-at-prompt))) + +(define suspend-read-to-actor + (%suspend-io-to-actor (@@ (fibers) wait-for-readable))) + +(define suspend-write-to-actor + (%suspend-io-to-actor (@@ (fibers) wait-for-writable))) + +(define (with-actor-nonblocking-ports thunk) + "Runs THUNK in dynamic context in which attempting to read/write +from a port that would otherwise block an actor's correspondence with +other actors (note that reading from a nonblocking port should never +block other fibers) will instead permit reading other messages while +I/O is waiting to complete. + +Note that currently " + (parameterize ((current-read-waiter suspend-read-to-actor) + (current-write-waiter suspend-write-to-actor)) + (thunk))) + +(define (actor-spawn-fiber thunk . args) + "Spawn a fiber from an actor but unset actor-machinery-specific +dynamic context." + (apply spawn-fiber + (lambda () + (*current-actor* #f) + (*resume-io-channel* #f) + (*actor-prompt* #f) + (thunk)) + args)) @@ -398,490 +517,167 @@ to come after class definition." (action ...) slots ...) (define-class class inherits - (actions #:init-value (build-actions action ...) + (actions #:init-thunk (build-actions action ...) #:allocation #:each-subclass) slots ...)) ;;; The Hive ;;; ======== -;;; Every actor has a hive. The hive is a kind of "meta-actor" -;;; which routes all the rest of the actors in a system. +;;; Every actor has a hive, which keeps track of other actors, manages +;;; cleanup, and performs inter-hive communication. -(define-generic hive-handle-failed-forward) - -(define-class () +;; TODO: Make this a srfi-9 record type +(define-class () + (id #:init-keyword #:id + #:getter hive-id) (actor-registry #:init-thunk make-hash-table #:getter hive-actor-registry) - (msg-id-generator #:init-thunk simple-message-id-generator - #:getter hive-msg-id-generator) + ;; TODO: Rename "ambassadors" to "relays" ;; Ambassadors are used (or will be) for inter-hive communication. - ;; These are special actors that know how to route messages to other hives. + ;; These are special actors that know how to route messages to other + ;; hives. (ambassadors #:init-thunk make-weak-key-hash-table #:getter hive-ambassadors) - ;; Waiting coroutines - ;; This is a map from cons cell of message-id - ;; to a cons cell of (actor-id . coroutine) - ;; @@: Should we have a record type? - ;; @@: Should there be any way to clear out "old" coroutines? - (waiting-coroutines #:init-thunk make-hash-table - #:getter hive-waiting-coroutines) - ;; Message prompt - ;; When actors send messages to each other they abort to this prompt - ;; to send the message, then carry on their way - (prompt #:init-thunk make-prompt-tag - #:getter hive-prompt) - (actions #:allocation #:each-subclass - #:init-value - (build-actions - ;; This is in the case of an ambassador failing to forward a - ;; message... it reports it back to the hive - (*failed-forward* hive-handle-failed-forward) - ;; These are called at start and end of run-hive - (*init-all* hive-handle-init-all) - (*cleanup-all* hive-handle-cleanup-all)))) - -(define-method (hive-handle-init-all (hive ) message) - "Run *init* method on all actors in registry" - ;; We have to do this hack and run over the list - ;; twice, because hash-for-each would result in an unrewindable - ;; continuation, and to avoid the hash-map changing during the - ;; middle of this. - (define actor-ids - (hash-map->list (lambda (actor-id actor) actor-id) - (hive-actor-registry hive))) - (for-each (lambda (actor-id) - (let* ((actor (hash-ref (hive-actor-registry hive) - actor-id))) - (match (slot-ref actor 'should-init) - (#f #f) - ('wait - (<-wait actor-id '*init*)) - (_ - (<- actor-id '*init*))))) - actor-ids)) - -(define-method (hive-handle-failed-forward (hive ) message) - "Handle an ambassador failing to forward a message" - 'TODO) - -(define-method (hive-handle-cleanup-all (hive ) message) - "Send a message to all actors in our registry to clean themselves up." - ;; We have to do this hack and run over the list - ;; twice, because hash-for-each would result in an unrewindable - ;; continuation, and to avoid the hash-map changing during the - ;; middle of this. - (define actor-ids - (hash-map->list (lambda (actor-id actor) actor-id) - (hive-actor-registry hive))) - (for-each (lambda (actor-id) - (<- actor-id '*cleanup*)) - actor-ids)) + (channel #:init-thunk make-channel + #:getter hive-channel) + (halt? #:init-thunk make-condition + #:getter hive-halt?)) (define* (make-hive #:key hive-id) - (let ((hive (make - #:id (make-address - "hive" (or hive-id - (big-random-number-string)))))) - ;; Set the hive's actor reference to itself - (set! (actor-hive hive) hive) - ;; Register the actor with itself - (hive-register-actor! hive hive) - hive)) - -(define-method (hive-id (hive )) - (actor-id-hive hive)) - -(define-method (hive-gen-actor-id (hive ) cookie) - (make-address (if cookie - (string-append cookie ":" (big-random-number-string)) - (big-random-number-string)) - (hive-id hive))) - -(define-method (hive-gen-message-id (hive )) - "Generate a message id using HIVE's message id generator" - ((hive-msg-id-generator hive))) - -(define-method (hive-resolve-local-actor (hive ) actor-address) - (hash-ref (hive-actor-registry hive) actor-address)) - -(define-method (hive-resolve-ambassador (hive ) ambassador-address) - (hash-ref (hive-ambassadors hive) ambassador-address)) - -(define-method (make-forward-request (hive ) (ambassador ) message) - (make-message (hive-gen-message-id hive) (actor-id ambassador) - ;; If we make the hive not an actor, we could either switch this - ;; to #f or to the original actor...? - ;; Maybe some more thinking should be done on what should - ;; happen in case of failure to forward? Handling ambassador failures - ;; seems like the primary motivation for the hive remaining an actor. - (actor-id hive) - '*forward* - `((original . ,message)))) - -(define-method (hive-reply-with-error (hive ) original-message - error-key error-args) - ;; We only supply the error-args if the original sender is on the same hive - (define (orig-actor-on-same-hive?) - (equal? (hive-id hive) - (address-hive-id (message-from original-message)))) - (set-message-replied! original-message #t) - (let* ((new-message-body - (if (orig-actor-on-same-hive?) - `(#:original-message ,original-message - #:error-key ,error-key - #:error-args ,error-args) - `(#:original-message ,original-message - #:error-key ,error-key))) - (new-message (make-message (hive-gen-message-id hive) - (message-from original-message) - (actor-id hive) '*error* - new-message-body - #:in-reply-to (message-id original-message)))) - ;; We only return a thunk, rather than run 8sync here, because if - ;; we ran 8sync in the middle of a catch we'd end up with an - ;; unresumable continuation. - (lambda () (hive-process-message hive new-message)))) - -(define-record-type - (make-waiting-on-reply actor-id kont send-options) - waiting-on-reply? - (actor-id waiting-on-reply-actor-id) - (kont waiting-on-reply-kont) - (send-options waiting-on-reply-send-options)) - - -(define-method (hive-process-message (hive ) message) - "Handle one message, or forward it via an ambassador" - (define (maybe-autoreply actor) - ;; Possibly autoreply - (if (message-needs-reply? message) - (<-auto-reply actor message))) - - (define (resolve-actor-to) - "Get the actor the message was aimed at" - (let ((actor (hive-resolve-local-actor hive (message-to message)))) - (if (not actor) - (throw 'actor-not-found - (format #f "Message ~a from ~a directed to nonexistant actor ~a" - (message-id message) - (address->string (message-from message)) - (address->string (message-to message))) - message)) - actor)) - - (define (call-catching-coroutine thunk) - (define queued-error-handling-thunk #f) - (define (call-catching-errors) - ;; TODO: maybe parameterize (or attach to hive) and use - ;; maybe-catch-all from agenda.scm - ;; @@: Why not just use with-throw-handler and let the catch - ;; happen at the agenda? That's what we used to do, but - ;; it ended up with a SIGABRT. See: - ;; http://lists.gnu.org/archive/html/bug-guile/2016-05/msg00003.html - (catch #t - thunk - ;; In the actor model, we don't totally crash on errors. - (lambda _ #f) - ;; If an error happens, we raise it - (lambda (key . args) - (if (message-needs-reply? message) - ;; If the message is waiting on a reply, let them know - ;; something went wrong. - ;; However, we have to do it outside of this catch - ;; routine, or we'll end up in an unrewindable continuation - ;; situation. - (set! queued-error-handling-thunk - (hive-reply-with-error hive message key args))) - ;; print error message - (apply print-error-and-continue key args))) - ;; @@: This is a kludge. See above for why. - (if queued-error-handling-thunk - (8sync (queued-error-handling-thunk)))) - (call-with-prompt (hive-prompt hive) - call-catching-errors - (lambda (kont actor message send-options) - ;; Register the coroutine - (hash-set! (hive-waiting-coroutines hive) - (message-id message) - (make-waiting-on-reply - (actor-id actor) kont send-options)) - ;; Send off the message - (8sync (hive-process-message hive message))))) - - (define (process-local-message) - (let ((actor (resolve-actor-to))) - (call-catching-coroutine - (lambda () - (define message-handler (actor-message-handler actor)) - ;; @@: Should a more general error handling happen here? - (parameterize ((%current-actor actor)) - (let ((result - (message-handler actor message))) - (maybe-autoreply actor) - ;; Returning result allows actors to possibly make a run-request - ;; at the end of handling a message. - ;; ... We do want that, right? - result)))))) - - (define (resume-waiting-coroutine) - (case (message-action message) - ;; standard reply / auto-reply - ((*reply* *auto-reply* *error*) - (call-catching-coroutine - (lambda () - (match (hash-remove! (hive-waiting-coroutines hive) - (message-in-reply-to message)) - ((_ . waiting) - (if (not (equal? (message-to message) - (waiting-on-reply-actor-id waiting))) - (throw 'resuming-to-wrong-actor - "Attempted to resume a coroutine to the wrong actor!" - #:expected-actor-id (message-to message) - #:got-actor-id (waiting-on-reply-actor-id waiting) - #:message message)) - (let* (;; @@: How should we resolve resuming coroutines to actors who are - ;; now gone? - (actor (resolve-actor-to)) - (kont (waiting-on-reply-kont waiting)) - (result (kont message))) - (maybe-autoreply actor) - result)) - (#f (throw 'no-waiting-coroutine - "message in-reply-to tries to resume nonexistent coroutine" - message)))))) - ;; Unhandled action for a reply! - (else - (throw 'hive-unresumable-coroutine - "Won't resume coroutine, nonsense action on reply message" - #:action (message-action message) - #:message message)))) - - (define (process-remote-message) - ;; Find the ambassador - (let* ((remote-hive-id (hive-id (message-to message))) - (ambassador (hive-resolve-ambassador remote-hive-id)) - (message-handler (actor-message-handler ambassador)) - (forward-request (make-forward-request hive ambassador message))) - (message-handler ambassador forward-request))) - - (let ((to (message-to message))) - ;; This seems to be an easy mistake to make, so check that addressing - ;; is correct here - (if (not to) - (throw 'missing-addressee - "`to' field is missing on message" - #:message message)) - (if (hive-actor-local? hive to) - (if (message-in-reply-to message) - (resume-waiting-coroutine) - (process-local-message)) - (process-remote-message)))) - -(define-method (hive-actor-local? (hive ) address) - (equal? (hive-id hive) (address-hive-id address))) - -(define-method (hive-register-actor! (hive ) (actor )) - (hash-set! (hive-actor-registry hive) (actor-id actor) actor)) - -(define-method (%hive-create-actor (hive ) actor-class - init-args id-cookie send-init?) - "Actual method called by bootstrap-actor / create-actor. - -Since this is a define-method it can't accept fancy define* arguments, -so this gets called from the nicer bootstrap-actor interface. See -that method for documentation." - (let* ((actor-id (hive-gen-actor-id hive id-cookie)) + (make #:id (or hive-id + (big-random-number-string)))) + +(define (gen-actor-id cookie) + (if cookie + (string-append cookie ":" (big-random-number-string)) + (big-random-number-string))) + +(define (hive-main-loop hive) + "The main loop of the hive. This listens for messages on the hive-channel +for certain actions to perform. + +`messages' here is not the same as a object; these are a list of +values, the first value being a symbol" + (define channel (hive-channel hive)) + (define halt? (hive-halt? hive)) + (define registry (hive-actor-registry hive)) + + ;; not the same as a ;P + (define handle-message + (match-lambda + (('register-actor actor-id address actor) + (hash-set! registry actor-id (vector address actor))) + ;; Remove the actor from hive + (('remove-actor actor-id) + (hash-remove! (hive-actor-registry hive) actor-id)) + (('register-ambassador hive-id ambassador-actor-id) + 'TODO) + (('unregister-ambassador hive-id ambassador-actor-id) + 'TODO) + (('forward-message from-actor-id message) + 'TODO))) + + (define halt-or-handle + (choice-operation + (wrap-operation (get-operation channel) + (lambda (msg) + (handle-message msg) + #t)) + (wrap-operation (wait-operation halt?) + (const #f)))) + + (let lp () + (and (perform-operation halt-or-handle) + (lp)))) + +(define *hive-id* (make-parameter #f)) +(define *hive-channel* (make-parameter #f)) + +;; @@: Should we halt the hive either at the end of spawn-hive or run-hive? +(define* (spawn-hive proc #:key (hive (make-hive))) + "Spawn a hive and run PROC, passing it the fresh hive and establishing +a dynamic context surrounding the hive." + (spawn-fiber (lambda () (hive-main-loop hive))) + (parameterize ((*hive-id* (hive-id hive)) + (*hive-channel* (hive-channel hive))) + (proc hive))) + +(define (run-hive proc . args) + "Spawn a hive and run it in run-fibers. Takes a PROC as would be passed +to spawn-hive... all remaining arguments passed to run-fibers." + (apply run-fibers + (lambda () + (spawn-hive proc)) + args)) + +(define (%create-actor actor-class init-args id-cookie send-init?) + (let* ((hive-channel (*hive-channel*)) + (hive-id (*hive-id*)) + (actor-id (gen-actor-id id-cookie)) + (dead? (make-condition)) + (inbox-enq (make-channel)) + (address (make-address actor-id hive-id + inbox-enq dead?)) (actor (apply make actor-class - #:hive hive - #:id actor-id + #:address address init-args)) - (actor-should-init (slot-ref actor 'should-init))) - (hive-register-actor! hive actor) - ;; Maybe run actor init method - (when (and send-init? actor-should-init) - (let ((send-method - (if (eq? actor-should-init 'wait) - <-wait <-))) - (send-method actor-id '*init*))) - ;; return the actor id - actor-id)) - -(define* (bootstrap-actor hive actor-class #:rest init-args) - "Create an actor on HIVE using ACTOR-CLASS passing in INIT-ARGS args" - (%hive-create-actor hive actor-class - init-args (symbol->string (class-name actor-class)) - #f)) - -(define* (bootstrap-actor* hive actor-class id-cookie #:rest init-args) - "Create an actor, but also allow customizing a 'cookie' added to the id -for debugging" - (%hive-create-actor hive actor-class - init-args id-cookie - #f)) - -(define (call-with-message message proc) - "Applies message body arguments into procedure, with message as first -argument. Similar to call-with-values in concept." - (apply proc message (message-body message))) - -;; (mbody-receive (<- bar baz) -;; (baz) -;; basil) - -;; Emacs: (put 'mbody-receive 'scheme-indent-function 2) - -;; @@: Or receive-msg or receieve-message or?? -(define-syntax-rule (mbody-receive arglist message body ...) - "Call body with arglist (which can accept arguments like lambda*) -applied from the message-body of message." - (call-with-message message - (lambda* arglist - body ...))) - -(define (mbody-val message) - "Retrieve the first value from the message-body of message. -Like single value return from a procedure call. Probably the most -common case when waiting on a reply from some action invocation." - (call-with-message message - (lambda (_ val) val))) - - -;;; Various API methods for actors to interact with the system -;;; ========================================================== - -;; TODO: move send-message and friends here...? - -(define* (create-actor from-actor actor-class #:rest init-args) + (should-init (actor-should-init actor))) + + ;; start the main loop + (spawn-fiber (lambda () + ;; start the inbox loop + (spawn-fiber + (lambda () + (delivery-agent inbox-enq (actor-inbox-deq actor) + dead?)) + ;; this one is decidedly non-parallel, because we want + ;; the delivery agent to be in the same thread as its actor + #:parallel? #f) + + (actor-main-loop actor)) + #:parallel? #t) + + (put-message hive-channel (list 'register-actor actor-id address actor)) + + ;; return the address + address)) + +(define* (create-actor actor-class #:rest init-args) "Create an instance of actor-class. Return the new actor's id. This is the method actors should call directly (unless they want to supply an id-cookie, in which case they should use create-actor*)." - (%hive-create-actor (actor-hive from-actor) actor-class - init-args #f #t)) + (%create-actor actor-class init-args #f #t)) -(define* (create-actor* from-actor actor-class id-cookie #:rest init-args) +(define* (create-actor* actor-class id-cookie #:rest init-args) "Create an instance of actor-class. Return the new actor's id. Like create-actor, but permits supplying an id-cookie." - (%hive-create-actor (actor-hive from-actor) actor-class - init-args id-cookie #t)) - + (%create-actor actor-class init-args id-cookie #t)) (define* (self-destruct actor #:key (cleanup #t)) "Remove an actor from the hive. Unless #:cleanup is set to #f, this will first have the actor handle its '*cleanup* action handler." - (when cleanup - (<-wait (actor-id actor) '*cleanup*)) - (hash-remove! (hive-actor-registry (actor-hive actor)) - (actor-id actor))) + (signal-condition! (address-dead? (actor-id actor))) + (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor))) + ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending + ;; a message with <-wait + (*actor-prompt* #f) + (actor-cleanup! actor)) + +;; From a patch I sent to Fibers... +(define (condition-signalled? cvar) + "Return @code{#t} if @var{cvar} has already been signalled. + +In general you will want to use @code{wait} or @code{wait-operation} to +wait on a condition. However, sometimes it is useful to see whether or +not a condition has already been signalled without blocking." + (atomic-box-ref ((@@ (fibers conditions) condition-signalled?) cvar))) - - -;;; 8sync bootstrap utilities -;;; ========================= - -(define* (run-hive hive initial-tasks - #:key (cleanup #t) - (handle-signals (list SIGINT SIGTERM))) - "Start up an agenda and run HIVE in it with INITIAL-TASKS. - -Keyword arguments: - - #:cleanup: Whether to run *cleanup* on all actors. - - #:handle-sigactions: a list of signals to set up interrupt - handlers for, so cleanup sill still happen as expected. - Defaults to a list of SIGINT and SIGTERM." - (dynamic-wind - (const #f) - (lambda () - (define (run-it escape) - (define (handle-signal signum) - (restore-signals) - (escape signum)) - (for-each (lambda (signum) - (sigaction signum handle-signal)) - handle-signals) - (let* ((queue (list->q - (cons (bootstrap-message hive (actor-id hive) '*init-all*) - initial-tasks))) - (agenda (make-agenda #:pre-unwind-handler print-error-and-continue - #:queue queue))) - (run-agenda agenda))) - (call/ec run-it)) - ;; Run cleanup - (lambda () - (when cleanup - (run-hive-cleanup hive))))) - -(define (run-hive-cleanup hive) - (let ((queue (list->q (list (bootstrap-message hive (actor-id hive) - '*cleanup-all*))))) - (run-agenda - (make-agenda #:queue queue)))) - -(define (bootstrap-message hive to-id action . message-body-args) - (wrap - (apply <-* `(#:actor ,hive) to-id action message-body-args))) - - - -;;; Basic readers / writers -;;; ======================= - -(define (serialize-message message) - "Serialize a message for read/write" - (list - (message-id message) - (message-to message) - (message-from message) - (message-action message) - (message-body message) - (message-in-reply-to message) - (message-wants-reply message) - (message-replied message))) - -(define* (write-message message #:optional (port (current-output-port))) - "Write out a message to a port for easy reading later. - -Note that if a sub-value can't be easily written to something -Guile's `read' procedure knows how to read, this doesn't do anything -to improve that. You'll need a better serializer for that.." - (write (serialize-message message) port)) - -(define (serialize-message-pretty message) - "Serialize a message in a way that's easy for humans to read." - `(*message* - (id ,(message-id message)) - (to ,(message-to message)) - (from ,(message-from message)) - (action ,(message-action message)) - (body ,(message-body message)) - (in-reply-to ,(message-in-reply-to message)) - (wants-reply ,(message-wants-reply message)) - (replied ,(message-replied message)))) - -(define (pprint-message message) - "Pretty print a message." - (pretty-print (serialize-message-pretty message))) - -(define* (read-message #:optional (port (current-input-port))) - "Read a message serialized via serialize-message from PORT" - (match (read port) - ((id to from action body in-reply-to wants-reply replied) - (make-message-intern - id to from action body - in-reply-to wants-reply replied)) - (anything-else - (throw 'message-read-bad-structure - "Could not read message from structure" - anything-else)))) - -(define (read-message-from-string message-str) - "Read message from MESSAGE-STR" - (with-input-from-string message-str - (lambda () - (read-message (current-input-port))))) +(define (actor-alive? actor) + (condition-signalled? (address-dead? (actor-id actor))))