X-Git-Url: https://jxself.org/git/?p=8sync.git;a=blobdiff_plain;f=8sync%2Fsystems%2Factors.scm;h=f9574d39b6a7a08614206509b0acf710389a1955;hp=9009d301470a1c9872fd32cfe306b5f1b1b1653d;hb=48174b48346c194142c4f1204cae60a5d1cc80c4;hpb=9414dea358e9067f1d333650df1a228dcc4bf378 diff --git a/8sync/systems/actors.scm b/8sync/systems/actors.scm index 9009d30..f9574d3 100644 --- a/8sync/systems/actors.scm +++ b/8sync/systems/actors.scm @@ -31,7 +31,6 @@ big-random-number big-random-number-string simple-message-id-generator - require-slot actor-id @@ -48,7 +47,8 @@ actor-id-hive actor-id-string - make-action-dispatch + mlambda define-mhandler + simple-dispatcher build-actions make-action-dispatch define-simple-actor @@ -71,6 +71,8 @@ send-message send-message-wait reply-message reply-message-wait + <- <-wait <-reply <-reply-wait + ez-run-hive bootstrap-message @@ -103,13 +105,6 @@ (set! counter (1+ counter)) (string-append prefix (number->string counter))))) -(define (require-slot slot-name) - "Generate something for #:init-thunk to complain about unfilled slot" - (lambda () - (throw 'required-slot - (format #f "Slot ~s not filled" slot-name) - slot-name))) - ;;; Messages @@ -202,7 +197,7 @@ If key not found and DFLT not provided, throw an error." (message (make-message (hive-gen-message-id hive) to-id (actor-id from-actor) action (kwarg-list-to-alist message-body-args)))) - (8sync-nowait (hive-process-message hive message)))) + (8sync (hive-process-message hive message)))) (define (send-message-wait from-actor to-id action . message-body-args) "Send a message from an actor to another, but wait until we get a response" @@ -229,7 +224,7 @@ If key not found and DFLT not provided, throw an error." (actor-id from-actor) '*reply* (kwarg-list-to-alist message-body-args) #:in-reply-to (message-id original-message)))) - (8sync-nowait (hive-process-message hive new-message)))) + (8sync (hive-process-message hive new-message)))) (define (reply-message-wait from-actor original-message . message-body-args) @@ -246,23 +241,30 @@ If key not found and DFLT not provided, throw an error." (abort-to-prompt abort-to from-actor new-message))) +;;; Aliases! +;;; See: http://mumble.net/~jar/articles/oo-moon-weinreb.html +;;; (also worth seeing: http://mumble.net/~jar/articles/oo.html ) + +(define <- send-message) +(define <-wait send-message-wait) +(define <-reply reply-message) +(define <-reply-wait reply-message-wait) + + ;;; Main actor implementation ;;; ========================= (define-class () ;; An address object - (id #:init-thunk (require-slot "id") - #:init-keyword #:id + (id #:init-keyword #:id #:getter actor-id) ;; The hive we're connected to. ;; We need this to be able to send messages. - (hive #:init-thunk (require-slot "hive") - #:init-keyword #:hive + (hive #:init-keyword #:hive #:accessor actor-hive) ;; How we receive and process new messages - (message-handler #:init-thunk (require-slot "message-handler") - #:allocation #:each-subclass)) + (message-handler #:allocation #:each-subclass)) (define-method (actor-message-handler (actor )) (slot-ref actor 'message-handler)) @@ -314,11 +316,50 @@ If key not found and DFLT not provided, throw an error." ;;; Actor utilities ;;; =============== + +(define-syntax-rule (with-message-args (message message-arg ...) + body body* ...) + (let ((message-arg (message-ref message (quote message-arg))) ...) + body body* ...)) + +(define-syntax mlambda + (lambda (x) + "A lambda for building message handlers. + +Use it like: + (mlambda (actor message foo) + ...) + +Which is like doing manually: + (lambda (actor message) + (let ((foo (message-ref message foo))) + ...))" + (syntax-case x () + ((_ (actor message message-arg ...) + docstring + body ...) + (string? (syntax->datum #'docstring)) + #'(lambda (actor message) + docstring + (with-message-args (message message-arg ...) body ...))) + ((_ (actor message message-arg ...) + body body* ...) + #'(lambda (actor message) + (with-message-args (message message-arg ...) body body* ...)))))) + +(define-syntax-rule (define-mhandler (name actor message message-arg ...) + body ...) + (define name + (mlambda (actor message message-arg ...) + body ...))) + (define (simple-dispatcher action-map) (lambda (actor message) (let* ((action (message-action message)) (method (assoc-ref action-map action))) (if (not method) + ;; @@: There's every possibility this should be handled in + ;; hive-process-message instead. (throw 'action-not-found "No appropriate action handler found for actor" #:action action @@ -331,11 +372,20 @@ If key not found and DFLT not provided, throw an error." (syntax-rules () ((_ ((action-name action-args ...) body ...)) (cons (quote action-name) - (lambda (action-args ...) + (mlambda (action-args ...) body ...))) ((_ (action-name handler)) (cons (quote action-name) handler)))) +(define-syntax-rule (build-actions action-item ...) + "Build a mapping of actions. Same syntax as make-action-dispatch +but this doesn't build the dispatcher for you (you probably want to +pass it to simple-dispatcher). + +The advantage here is that since this simply builds an alist, you can +compose it with other action maps." + (list (%expand-action-item action-item) ...)) + (define-syntax make-action-dispatch (syntax-rules () "Expand a list of action names and actions into an alist @@ -357,8 +407,7 @@ more compact following syntax: ((party actor message) (display \"Life of the party!\")))" ((make-action-dispatch action-item ...) - (simple-dispatcher - (list (%expand-action-item action-item) ...))))) + (simple-dispatcher (build-actions action-item ...))))) (define-syntax-rule (define-simple-actor class actions ...) (define-class class () @@ -375,8 +424,6 @@ more compact following syntax: (define-generic hive-handle-failed-forward) (define-class () - ;; This gets set to itself immediately after being created - (hive #:init-value #f) (actor-registry #:init-thunk make-hash-table #:getter hive-actor-registry) (msg-id-generator #:init-thunk simple-message-id-generator @@ -447,6 +494,27 @@ more compact following syntax: '*forward* `((original . ,message)))) +(define-method (hive-reply-with-error (hive ) original-message + error-key error-args) + ;; We only supply the error-args if the original sender is on the same hive + (define (orig-actor-on-same-hive?) + (equal? (hive-id hive) + (address-hive-id (message-from original-message)))) + (set-message-replied! original-message #t) + (let* ((new-message-body + (if (orig-actor-on-same-hive?) + `((original-message . ,original-message) + (error-key . ,error-key) + (error-args . ,error-args)) + `((original-message . ,original-message) + (error-key . ,error-key)))) + (new-message (make-message (hive-gen-message-id hive) + (message-from original-message) + (actor-id hive) '*error* + new-message-body + #:in-reply-to (message-id original-message)))) + (8sync (hive-process-message hive new-message)))) + (define-method (hive-process-message (hive ) message) "Handle one message, or forward it via an ambassador" (define (maybe-autoreply actor) @@ -469,16 +537,34 @@ more compact following syntax: actor)) (define (call-catching-coroutine thunk) + (define (call-catching-errors) + ;; TODO: maybe parameterize (or attach to hive) and use + ;; maybe-catch-all from agenda.scm + ;; @@: Why not just use with-throw-handler and let the catch + ;; happen at the agenda? That's what we used to do, but + ;; it ended up with a SIGABRT. See: + ;; http://lists.gnu.org/archive/html/bug-guile/2016-05/msg00003.html + (catch #t + thunk + ;; In the actor model, we don't totally crash on errors. + (lambda _ #f) + ;; If an error happens, we raise it + (lambda (key . args) + (if (message-needs-reply message) + ;; If the message is waiting on a reply, let them know + ;; something went wrong. + (hive-reply-with-error hive message key args)) + ;; print error message + (apply print-error-and-continue key args)))) (call-with-prompt (hive-prompt hive) - thunk + call-catching-errors (lambda (kont actor message) - (let ((hive (actor-hive actor))) - ;; Register the coroutine - (hash-set! (hive-waiting-coroutines hive) - (message-id message) - (cons (actor-id actor) kont)) - ;; Send off the message - (8sync (hive-process-message hive message)))))) + ;; Register the coroutine + (hash-set! (hive-waiting-coroutines hive) + (message-id message) + (cons (actor-id actor) kont)) + ;; Send off the message + (8sync (hive-process-message hive message))))) (define (process-local-message) (let ((actor (resolve-actor-to))) @@ -495,27 +581,44 @@ more compact following syntax: result))))) (define (resume-waiting-coroutine) - (call-catching-coroutine - (lambda () - (match (hash-remove! (hive-waiting-coroutines hive) - (message-in-reply-to message)) - ((_ . (resume-actor-id . kont)) - (if (not (equal? (message-to message) - resume-actor-id)) - (throw 'resuming-to-wrong-actor - "Attempted to resume a coroutine to the wrong actor!" - #:expected-actor-id (message-to message) - #:got-actor-id resume-actor-id - #:message message)) - (let (;; @@: How should we resolve resuming coroutines to actors who are - ;; now gone? - (actor (resolve-actor-to)) - (result (kont message))) - (maybe-autoreply actor) - result)) - (#f (throw 'no-waiting-coroutine - "message in-reply-to tries to resume nonexistent coroutine" - message)))))) + (cond + ((eq? (message-action message) '*reply*) + (call-catching-coroutine + (lambda () + (match (hash-remove! (hive-waiting-coroutines hive) + (message-in-reply-to message)) + ((_ . (resume-actor-id . kont)) + (if (not (equal? (message-to message) + resume-actor-id)) + (throw 'resuming-to-wrong-actor + "Attempted to resume a coroutine to the wrong actor!" + #:expected-actor-id (message-to message) + #:got-actor-id resume-actor-id + #:message message)) + (let (;; @@: How should we resolve resuming coroutines to actors who are + ;; now gone? + (actor (resolve-actor-to)) + (result (kont message))) + (maybe-autoreply actor) + result)) + (#f (throw 'no-waiting-coroutine + "message in-reply-to tries to resume nonexistent coroutine" + message)))))) + ;; Yikes, we must have gotten an error or something back + (else + ;; @@: Not what we want in the long run? + ;; What we'd *prefer* to do is to resume this message + ;; and throw an error inside the message handler + ;; (say, from send-mesage-wait), but that causes a SIGABRT (??!!) + (hash-remove! (hive-waiting-coroutines hive) + (message-in-reply-to message)) + (let ((explaination + (if (eq? (message-action message) '*reply*) + "Won't resume coroutine; got an *error* as a reply" + "Won't resume coroutine because action is not *reply*"))) + (throw 'hive-unresumable-coroutine + explaination + #:message message))))) (define (process-remote-message) ;; Find the ambassador @@ -539,7 +642,7 @@ more compact following syntax: (process-remote-message)))) (define-method (hive-actor-local? (hive ) address) - (hash-ref (hive-actor-registry hive) address)) + (equal? (hive-id hive) (address-hive-id address))) (define-method (hive-register-actor! (hive ) (actor )) (hash-set! (hive-actor-registry hive) (actor-id actor) actor))