X-Git-Url: https://jxself.org/git/?p=8sync.git;a=blobdiff_plain;f=8sync%2Fsystems%2Factors.scm;h=ce46f63614fb7640eb3db3ee90450cffac999d92;hp=c08916ea5d39a9b1465d7c28e2895f4523e436fa;hb=ea14f5dc75cebaaf9cc21d96a6575b3738c43354;hpb=c5d1445fa4cecb4eea22d73bf0e9535e3490f6d4 diff --git a/8sync/systems/actors.scm b/8sync/systems/actors.scm index c08916e..ce46f63 100644 --- a/8sync/systems/actors.scm +++ b/8sync/systems/actors.scm @@ -31,11 +31,9 @@ big-random-number big-random-number-string simple-message-id-generator - require-slot actor-id - actor-hive actor-message-handler ;;; Commenting out the
type for now; @@ -49,7 +47,7 @@ actor-id-hive actor-id-string - make-action-dispatch + simple-dispatcher build-actions make-action-dispatch define-simple-actor @@ -59,29 +57,34 @@ hive-id hive-create-actor hive-create-actor* + create-actor create-actor* + self-destruct + make-message message? message-to message-action message-from message-id message-body message-in-reply-to message-wants-reply - message-ref - send-message send-message-wait - reply-message reply-message-wait + message-auto-reply? + + <- <-wait <-reply <-reply-wait + + call-with-message msg-receive => ez-run-hive - hive-bootstrap-message + bootstrap-message serialize-message write-message - serialize-message-pretty pprint-mesage + serialize-message-pretty pprint-message read-message read-message-from-string)) ;; For ids (define %random-state (make-parameter (random-state-from-platform))) -;; Probably bigger than necessary -(define random-number-size (expt 10 50)) +;; Same size as a uuid4 I think... +(define random-number-size (expt 2 128)) (define (big-random-number) (random random-number-size (%random-state))) @@ -101,12 +104,116 @@ (set! counter (1+ counter)) (string-append prefix (number->string counter))))) -(define (require-slot slot-name) - "Generate something for #:init-thunk to complain about unfilled slot" - (lambda () - (throw 'required-slot - (format #f "Slot ~s not filled" slot-name) - slot-name))) + + +;;; Messages +;;; ======== + + +(define-record-type + (make-message-intern id to from action + body in-reply-to wants-reply + replied) + message? + (id message-id) + (to message-to) + (from message-from) + (action message-action) + (body message-body) + (in-reply-to message-in-reply-to) + (wants-reply message-wants-reply) + (replied message-replied set-message-replied!)) + + +(define* (make-message id to from action body + #:key in-reply-to wants-reply + replied) + (make-message-intern id to from action body + in-reply-to wants-reply replied)) + +(define (message-auto-reply? message) + (eq? (message-action message) '*auto-reply*)) + +(define (message-needs-reply? message) + "See if this message needs a reply still" + (and (message-wants-reply message) + (not (message-replied message)))) + + +(define (kwarg-list-to-alist args) + (let loop ((remaining args) + (result '())) + (match remaining + (((? keyword? key) val rest ...) + (loop rest + (cons (cons (keyword->symbol key) val) + result))) + (() result) + (_ (throw 'invalid-kwarg-list + "Invalid keyword argument list" + args))))) + + +;;; See: https://web.archive.org/web/20081223021934/http://mumble.net/~jar/articles/oo-moon-weinreb.html +;;; (also worth seeing: http://mumble.net/~jar/articles/oo.html ) + +(define (<- from-actor to-id action . message-body-args) + "Send a message from an actor to another actor" + (let* ((hive (actor-hive from-actor)) + (message (make-message (hive-gen-message-id hive) to-id + (actor-id from-actor) action + message-body-args))) + (8sync (hive-process-message hive message)))) + +(define (<-wait from-actor to-id action . message-body-args) + "Send a message from an actor to another, but wait until we get a response" + (let* ((hive (actor-hive from-actor)) + (abort-to (hive-prompt (actor-hive from-actor))) + (message (make-message (hive-gen-message-id hive) to-id + (actor-id from-actor) action + message-body-args + #:wants-reply #t))) + (abort-to-prompt abort-to from-actor message))) + +;; TODO: Intelligently ~propagate(ish) errors on -wait functions. +;; We might have `send-message-wait-brazen' to allow callers to +;; not have an exception thrown and instead just have a message with +;; the appropriate '*error* message returned. + +(define (<-reply from-actor original-message . message-body-args) + "Reply to a message" + (set-message-replied! original-message #t) + (let* ((hive (actor-hive from-actor)) + (new-message (make-message (hive-gen-message-id hive) + (message-from original-message) + (actor-id from-actor) '*reply* + message-body-args + #:in-reply-to (message-id original-message)))) + (8sync (hive-process-message hive new-message)))) + +(define (<-auto-reply from-actor original-message) + "Auto-reply to a message. Internal use only!" + (set-message-replied! original-message #t) + (let* ((hive (actor-hive from-actor)) + (new-message (make-message (hive-gen-message-id hive) + (message-from original-message) + (actor-id from-actor) '*auto-reply* + '() + #:in-reply-to (message-id original-message)))) + (8sync (hive-process-message hive new-message)))) + +(define (<-reply-wait from-actor original-message . message-body-args) + "Reply to a messsage, but wait until we get a response" + (set-message-replied! original-message #t) + (let* ((hive (actor-hive from-actor)) + (abort-to (hive-prompt (actor-hive from-actor))) + (new-message (make-message (hive-gen-message-id hive) + (message-from original-message) + (actor-id from-actor) '*reply* + message-body-args + #:wants-reply #t + #:in-reply-to (message-id original-message)))) + (abort-to-prompt abort-to from-actor new-message))) @@ -115,17 +222,14 @@ (define-class () ;; An address object - (id #:init-thunk (require-slot "id") - #:init-keyword #:id + (id #:init-keyword #:id #:getter actor-id) ;; The hive we're connected to. ;; We need this to be able to send messages. - (hive #:init-thunk (require-slot "hive") - #:init-keyword #:hive + (hive #:init-keyword #:hive #:accessor actor-hive) ;; How we receive and process new messages - (message-handler #:init-thunk (require-slot "message-handler") - #:allocation #:each-subclass)) + (message-handler #:allocation #:each-subclass)) (define-method (actor-message-handler (actor )) (slot-ref actor 'message-handler)) @@ -182,23 +286,30 @@ (let* ((action (message-action message)) (method (assoc-ref action-map action))) (if (not method) + ;; @@: There's every possibility this should be handled in + ;; hive-process-message instead. (throw 'action-not-found "No appropriate action handler found for actor" #:action action #:actor actor #:message message #:available-actions (map car action-map))) - (method actor message)))) + (apply method actor message (message-body message))))) (define-syntax %expand-action-item (syntax-rules () - ((_ ((action-name action-args ...) body ...)) - (cons (quote action-name) - (lambda (action-args ...) - body ...))) ((_ (action-name handler)) (cons (quote action-name) handler)))) +(define-syntax-rule (build-actions action-item ...) + "Build a mapping of actions. Same syntax as make-action-dispatch +but this doesn't build the dispatcher for you (you probably want to +pass it to simple-dispatcher). + +The advantage here is that since this simply builds an alist, you can +compose it with other action maps." + (list (%expand-action-item action-item) ...)) + (define-syntax make-action-dispatch (syntax-rules () "Expand a list of action names and actions into an alist @@ -220,10 +331,9 @@ more compact following syntax: ((party actor message) (display \"Life of the party!\")))" ((make-action-dispatch action-item ...) - (simple-dispatcher - (list (%expand-action-item action-item) ...))))) + (simple-dispatcher (build-actions action-item ...))))) -(define-syntax-rule (define-simple-actor class (actions ...)) +(define-syntax-rule (define-simple-actor class actions ...) (define-class class () (message-handler #:init-value (make-action-dispatch actions ...) @@ -238,8 +348,6 @@ more compact following syntax: (define-generic hive-handle-failed-forward) (define-class () - ;; This gets set to itself immediately after being created - (hive #:init-value #f) (actor-registry #:init-thunk make-hash-table #:getter hive-actor-registry) (msg-id-generator #:init-thunk simple-message-id-generator @@ -252,6 +360,7 @@ more compact following syntax: ;; This is a map from cons cell of message-id ;; to a cons cell of (actor-id . coroutine) ;; @@: Should we have a record type? + ;; @@: Should there be any way to clear out "old" coroutines? (waiting-coroutines #:init-thunk make-hash-table #:getter hive-waiting-coroutines) ;; Message prompt @@ -309,9 +418,36 @@ more compact following syntax: '*forward* `((original . ,message)))) +(define-method (hive-reply-with-error (hive ) original-message + error-key error-args) + ;; We only supply the error-args if the original sender is on the same hive + (define (orig-actor-on-same-hive?) + (equal? (hive-id hive) + (address-hive-id (message-from original-message)))) + (set-message-replied! original-message #t) + (let* ((new-message-body + (if (orig-actor-on-same-hive?) + `(#:original-message ,original-message + #:error-key ,error-key + #:error-args ,error-args) + `(#:original-message ,original-message + #:error-key ,error-key))) + (new-message (make-message (hive-gen-message-id hive) + (message-from original-message) + (actor-id hive) '*error* + new-message-body + #:in-reply-to (message-id original-message)))) + (8sync (hive-process-message hive new-message)))) + (define-method (hive-process-message (hive ) message) "Handle one message, or forward it via an ambassador" - (define (process-local-message) + (define (maybe-autoreply actor) + ;; Possibly autoreply + (if (message-needs-reply? message) + (<-auto-reply actor message))) + + (define (resolve-actor-to) + "Get the actor the message was aimed at" (let ((actor (hive-resolve-local-actor hive (message-to message)))) (if (not actor) (throw 'actor-not-found @@ -320,29 +456,92 @@ more compact following syntax: (address->string (message-from message)) (address->string (message-to message))) message)) - (call-with-prompt (hive-prompt hive) - (lambda () - (define message-handler (actor-message-handler actor)) - ;; @@: Should a more general error handling happen here? - (message-handler actor message)) - - (lambda (kont actor message) - (let ((hive (actor-hive actor))) - ;; Register the coroutine - (hash-set! (hive-waiting-coroutines hive) - (message-id message) - (cons (actor-id actor) kont)) - ;; Send off the message - (8sync (hive-process-message hive message))))))) + actor)) + + (define (call-catching-coroutine thunk) + (define (call-catching-errors) + ;; TODO: maybe parameterize (or attach to hive) and use + ;; maybe-catch-all from agenda.scm + ;; @@: Why not just use with-throw-handler and let the catch + ;; happen at the agenda? That's what we used to do, but + ;; it ended up with a SIGABRT. See: + ;; http://lists.gnu.org/archive/html/bug-guile/2016-05/msg00003.html + (catch #t + thunk + ;; In the actor model, we don't totally crash on errors. + (lambda _ #f) + ;; If an error happens, we raise it + (lambda (key . args) + (if (message-needs-reply? message) + ;; If the message is waiting on a reply, let them know + ;; something went wrong. + (hive-reply-with-error hive message key args)) + ;; print error message + (apply print-error-and-continue key args)))) + (call-with-prompt (hive-prompt hive) + call-catching-errors + (lambda (kont actor message) + ;; Register the coroutine + (hash-set! (hive-waiting-coroutines hive) + (message-id message) + (cons (actor-id actor) kont)) + ;; Send off the message + (8sync (hive-process-message hive message))))) + + (define (process-local-message) + (let ((actor (resolve-actor-to))) + (call-catching-coroutine + (lambda () + (define message-handler (actor-message-handler actor)) + ;; @@: Should a more general error handling happen here? + (let ((result + (message-handler actor message))) + (maybe-autoreply actor) + ;; Returning result allows actors to possibly make a run-request + ;; at the end of handling a message. + ;; ... We do want that, right? + result))))) (define (resume-waiting-coroutine) - (match (hash-remove! (hive-waiting-coroutines hive) - (message-in-reply-to message)) - ((_ . kont) - (kont message)) - (#f (throw 'no-waiting-coroutine - "message in-reply-to tries to resume nonexistent coroutine" - message)))) + (cond + ((or (eq? (message-action message) '*reply*) + (eq? (message-action message) '*auto-reply*)) + (call-catching-coroutine + (lambda () + (match (hash-remove! (hive-waiting-coroutines hive) + (message-in-reply-to message)) + ((_ . (resume-actor-id . kont)) + (if (not (equal? (message-to message) + resume-actor-id)) + (throw 'resuming-to-wrong-actor + "Attempted to resume a coroutine to the wrong actor!" + #:expected-actor-id (message-to message) + #:got-actor-id resume-actor-id + #:message message)) + (let (;; @@: How should we resolve resuming coroutines to actors who are + ;; now gone? + (actor (resolve-actor-to)) + (result (kont message))) + (maybe-autoreply actor) + result)) + (#f (throw 'no-waiting-coroutine + "message in-reply-to tries to resume nonexistent coroutine" + message)))))) + ;; Yikes, we must have gotten an error or something back + (else + ;; @@: Not what we want in the long run? + ;; What we'd *prefer* to do is to resume this message + ;; and throw an error inside the message handler + ;; (say, from send-mesage-wait), but that causes a SIGABRT (??!!) + (hash-remove! (hive-waiting-coroutines hive) + (message-in-reply-to message)) + (let ((explaination + (if (eq? (message-action message) '*reply*) + "Won't resume coroutine; got an *error* as a reply" + "Won't resume coroutine because action is not *reply*"))) + (throw 'hive-unresumable-coroutine + explaination + #:message message))))) (define (process-remote-message) ;; Find the ambassador @@ -366,7 +565,7 @@ more compact following syntax: (process-remote-message)))) (define-method (hive-actor-local? (hive ) address) - (hash-ref (hive-actor-registry hive) address)) + (equal? (hive-id hive) (address-hive-id address))) (define-method (hive-register-actor! (hive ) (actor )) (hash-set! (hive-actor-registry hive) (actor-id actor) actor)) @@ -380,7 +579,6 @@ so this gets called from the nicer hive-create-actor interface. See that method for documentation." (let* ((actor-id (hive-gen-actor-id hive id-cookie)) (actor (apply make actor-class - ;; @@: If we switch to a hive-proxy, do it here #:hive hive #:id actor-id init))) @@ -388,163 +586,65 @@ that method for documentation." ;; return the actor id actor-id)) -(define* (hive-create-actor hive actor-class - #:key - (init '()) - id-cookie) +(define* (hive-create-actor hive actor-class #:rest init) (%hive-create-actor hive actor-class - init id-cookie)) + init #f)) -(define-syntax hive-create-actor* - (syntax-rules () - "Create an instance of actor-class attached to this hive. -Return the new actor's id. +(define* (hive-create-actor* hive actor-class id-cookie #:rest init) + (%hive-create-actor hive actor-class + init id-cookie)) -Used internally, and used for bootstrapping a fresh hive. +(define (call-with-message message proc) + "Applies message body arguments into procedure, with message as first +argument. Similar to call-with-values in concept." + (apply proc message (message-body message))) -Note that actors should generally not call this method directly. -Instead, actors should call create-actor." - ((_ args ... (init-args ...)) - (hive-create-actor args ... - #:init (list init-args ...))))) +;; (msg-receive (<- bar baz) +;; (baz) +;; basil) +;; Emacs: (put 'msg-receive 'scheme-indent-function 2) -;; TODO: Give actors this instead of the actual hive reference -(define-class () - (send-message #:getter proxy-send-message - #:init-keyword #:send-message) - (create-actor #:getter proxy-create-actor - #:init-keyword #:create-actor)) +;; @@: Or receive-msg or receieve-message or?? +(define-syntax-rule (msg-receive arglist the-message body ...) + (call-with-message the-message + (lambda* arglist + body ...))) -;; Live the hive proxy, but has access to the hive itself... -(define-class () - (hive #:init-keyword #:hive)) +;; Emacs: (put '=> 'scheme-indent-function 2) +;;; An experimental alias. +(define-syntax-rule (=> rest ...) + (msg-receive rest ...)) -;;; Messages -;;; ======== +;;; Various API methods for actors to interact with the system +;;; ========================================================== +;; TODO: move send-message and friends here...? -(define-record-type - (make-message-intern id to from action - body in-reply-to wants-reply ; do we need hive-proxy? - ;; Are these still needed? - replied deferred-reply) - message? - (id message-id) - (to message-to) - (from message-from) - (action message-action) - (body message-body) - (in-reply-to message-in-reply-to) - (wants-reply message-wants-reply) - - ;; See XUDD source for these. Not use yet, maybe eventually will be? - ;; XUDD uses them for autoreply. - ;; Requiring mutation on message objects is clearly not great, - ;; but it may be worth it...? Investigate! - (replied message-replied set-message-replied!) - (deferred-reply message-deferred-reply set-message-deferred-reply!)) - - -(define* (make-message id to from action body - #:key in-reply-to wants-reply - replied deferred-reply) - (make-message-intern id to from action body - in-reply-to wants-reply replied - deferred-reply)) - -;; Note: the body of messages is currently an alist, but it's created -;; from a keyword based property list (see the following two functions). -;; But, that's an extra conversion step, and maybe totally unnecessary: -;; we already have message-ref, and this could just pull a keyword -;; from a property list. -;; The main ways this might be useful are error checking, -;; serialization across the wire (but even that might require some -;; change), and using existing tooling (though adding new tooling -;; would be negligible in implementation effort.) - -;; This cons cell is immutable and unique (for eq? tests) -(define %nothing-provided (cons 'nothing 'provided)) - -(define* (message-ref message key #:optional (dflt %nothing-provided)) - "Extract KEY from body of MESSAGE. - -Optionally set default with [DFLT] -If key not found and DFLT not provided, throw an error." - (let ((result (assoc key (message-body message)))) - (if result (cdr result) - (if (eq? dflt %nothing-provided) - (throw 'message-body-lacks-key - "Message body does not contain key and no default provided" - #:key key - #:message message) - dflt)))) - - -(define (kwarg-list-to-alist args) - (let loop ((remaining args) - (result '())) - (match remaining - (((? keyword? key) val rest ...) - (loop rest - (cons (cons (keyword->symbol key) val) - result))) - (() result) - (_ (throw 'invalid-kwarg-list - "Invalid keyword argument list" - args))))) +(define* (create-actor from-actor actor-class #:rest init) + "Create an instance of actor-class. Return the new actor's id. +This is the method actors should call directly (unless they want +to supply an id-cookie, in which case they should use +create-actor*)." + (%hive-create-actor (actor-hive from-actor) actor-class + init #f)) -(define (send-message from-actor to-id action . message-body-args) - "Send a message from an actor to another actor" - (let* ((hive (actor-hive from-actor)) - (message (make-message (hive-gen-message-id hive) to-id - (actor-id from-actor) action - (kwarg-list-to-alist message-body-args)))) - (8sync (hive-process-message hive message)))) -(define (send-message-wait from-actor to-id action . message-body-args) - "Send a message from an actor to another, but wait until we get a response" - (let* ((hive (actor-hive from-actor)) - (agenda-prompt (hive-prompt (actor-hive from-actor))) - (message (make-message (hive-gen-message-id hive) to-id - (actor-id from-actor) action - (kwarg-list-to-alist message-body-args) - #:wants-reply #t))) - (abort-to-prompt agenda-prompt from-actor message))) +(define* (create-actor* from-actor actor-class id-cookie #:rest init) + "Create an instance of actor-class. Return the new actor's id. -;; TODO: Intelligently ~propagate(ish) errors on -wait functions. -;; We might have `send-message-wait-brazen' to allow callers to -;; not have an exception thrown and instead just have a message with -;; the appropriate '*error* message returned. +Like create-actor, but permits supplying an id-cookie." + (%hive-create-actor (actor-hive from-actor) actor-class + init id-cookie)) -(define (reply-message from-actor original-message - . message-body-args) - "Reply to a message" - (set-message-replied! original-message #t) - (let* ((hive (actor-hive from-actor)) - (new-message (make-message (hive-gen-message-id hive) - (message-from original-message) - (actor-id from-actor) '*reply* - (kwarg-list-to-alist message-body-args) - #:in-reply-to (message-id original-message)))) - (8sync (hive-process-message hive new-message)))) -(define (reply-message-wait from-actor original-message - . message-body-args) - "Reply to a messsage, but wait until we get a response" - (set-message-replied! original-message #t) - (let* ((hive (actor-hive from-actor)) - (agenda-prompt (hive-prompt (actor-hive from-actor))) - (new-message (make-message (hive-gen-message-id hive) - (message-from original-message) - (actor-id from-actor) '*reply* - (kwarg-list-to-alist message-body-args) - #:wants-reply #t - #:in-reply-to (message-id original-message)))) - (abort-to-prompt agenda-prompt from-actor new-message))) +(define (self-destruct actor) + "Remove an actor from the hive." + (hash-remove! (hive-actor-registry (actor-hive actor)) + (actor-id actor))) @@ -568,27 +668,26 @@ an integer." (spawn-and-queue-repl-server! agenda))) (start-agenda agenda))) -(define (hive-bootstrap-message hive to-id action . message-body-args) +(define (bootstrap-message hive to-id action . message-body-args) (wrap - (apply send-message hive to-id action message-body-args))) + (apply <- hive to-id action message-body-args))) -;;; Convenience procedures -;;; ====================== +;;; Basic readers / writers +;;; ======================= (define (serialize-message message) "Serialize a message for read/write" (list (message-id message) - (address->string (message-to message)) - (address->string (message-from message)) + (message-to message) + (message-from message) (message-action message) (message-body message) (message-in-reply-to message) (message-wants-reply message) - (message-replied message) - (message-deferred-reply message))) + (message-replied message))) (define* (write-message message #:optional (port (current-output-port))) "Write out a message to a port for easy reading later. @@ -608,8 +707,7 @@ to improve that. You'll need a better serializer for that.." (body ,(message-body message)) (in-reply-to ,(message-in-reply-to message)) (wants-reply ,(message-wants-reply message)) - (replied ,(message-replied message)) - (deferred-reply ,(message-deferred-reply message)))) + (replied ,(message-replied message)))) (define (pprint-message message) "Pretty print a message." @@ -618,10 +716,10 @@ to improve that. You'll need a better serializer for that.." (define* (read-message #:optional (port (current-input-port))) "Read a message serialized via serialize-message from PORT" (match (read port) - ((id to from action body in-reply-to wants-reply replied deferred-reply) + ((id to from action body in-reply-to wants-reply replied) (make-message-intern id to from action body - in-reply-to wants-reply replied deferred-reply)) + in-reply-to wants-reply replied)) (anything-else (throw 'message-read-bad-structure "Could not read message from structure"