From: Christopher Allan Webber Date: Mon, 25 Apr 2016 18:47:29 +0000 (-0500) Subject: actors: Properly autoreply to replies; make reply-message-wait work. X-Git-Tag: v0.2.0~41 X-Git-Url: https://jxself.org/git/?a=commitdiff_plain;h=5b3a0e8de367ddff6b132aa92e7786bc31cede4b;p=8sync.git actors: Properly autoreply to replies; make reply-message-wait work. * 8sync/systems/actors.scm (hive-process-message): Enable prompt even when resuming coroutines. Enable autoreply when resuming coroutines. (send-message-wait, reply-message-wait): Rename agenda-prompt variable to abort-to, which makes more sense. --- diff --git a/8sync/systems/actors.scm b/8sync/systems/actors.scm index 51a6e50..8f00d57 100644 --- a/8sync/systems/actors.scm +++ b/8sync/systems/actors.scm @@ -203,12 +203,12 @@ If key not found and DFLT not provided, throw an error." (define (send-message-wait from-actor to-id action . message-body-args) "Send a message from an actor to another, but wait until we get a response" (let* ((hive (actor-hive from-actor)) - (agenda-prompt (hive-prompt (actor-hive from-actor))) + (abort-to (hive-prompt (actor-hive from-actor))) (message (make-message (hive-gen-message-id hive) to-id (actor-id from-actor) action (kwarg-list-to-alist message-body-args) #:wants-reply #t))) - (abort-to-prompt agenda-prompt from-actor message))) + (abort-to-prompt abort-to from-actor message))) ;; TODO: Intelligently ~propagate(ish) errors on -wait functions. ;; We might have `send-message-wait-brazen' to allow callers to @@ -232,14 +232,14 @@ If key not found and DFLT not provided, throw an error." "Reply to a messsage, but wait until we get a response" (set-message-replied! original-message #t) (let* ((hive (actor-hive from-actor)) - (agenda-prompt (hive-prompt (actor-hive from-actor))) + (abort-to (hive-prompt (actor-hive from-actor))) (new-message (make-message (hive-gen-message-id hive) (message-from original-message) (actor-id from-actor) '*reply* (kwarg-list-to-alist message-body-args) #:wants-reply #t #:in-reply-to (message-id original-message)))) - (abort-to-prompt agenda-prompt from-actor new-message))) + (abort-to-prompt abort-to from-actor new-message))) @@ -445,7 +445,15 @@ more compact following syntax: (define-method (hive-process-message (hive ) message) "Handle one message, or forward it via an ambassador" - (define (process-local-message) + (define (maybe-autoreply actor) + ;; Possibly autoreply + (if (message-needs-reply message) + ;; @@: Should we give *autoreply* as the action instead of *reply*? + (reply-message actor message + #:*auto-reply* #t))) + + (define (resolve-actor-to) + "Get the actor the message was aimed at" (let ((actor (hive-resolve-local-actor hive (message-to message)))) (if (not actor) (throw 'actor-not-found @@ -454,46 +462,56 @@ more compact following syntax: (address->string (message-from message)) (address->string (message-to message))) message)) - (call-with-prompt (hive-prompt hive) - (lambda () - (define message-handler (actor-message-handler actor)) - ;; @@: Should a more general error handling happen here? - (let ((result - (message-handler actor message))) - ;; Possibly autoreply - (if (message-needs-reply message) - ;; @@: Should we give *autoreply* as the action instead of *reply*? - (reply-message actor message - #:*auto-reply* #t)) - ;; Returning result allows actors to possibly make a run-request - ;; at the end of handling a message. - ;; ... We do want that, right? - result)) + actor)) + + (define (call-catching-coroutine thunk) + (call-with-prompt (hive-prompt hive) + thunk + (lambda (kont actor message) + (let ((hive (actor-hive actor))) + ;; Register the coroutine + (hash-set! (hive-waiting-coroutines hive) + (message-id message) + (cons (actor-id actor) kont)) + ;; Send off the message + (8sync (hive-process-message hive message)))))) - (lambda (kont actor message) - (let ((hive (actor-hive actor))) - ;; Register the coroutine - (hash-set! (hive-waiting-coroutines hive) - (message-id message) - (cons (actor-id actor) kont)) - ;; Send off the message - (8sync (hive-process-message hive message))))))) + (define (process-local-message) + (let ((actor (resolve-actor-to))) + (call-catching-coroutine + (lambda () + (define message-handler (actor-message-handler actor)) + ;; @@: Should a more general error handling happen here? + (let ((result + (message-handler actor message))) + (maybe-autoreply actor) + ;; Returning result allows actors to possibly make a run-request + ;; at the end of handling a message. + ;; ... We do want that, right? + result))))) (define (resume-waiting-coroutine) - (match (hash-remove! (hive-waiting-coroutines hive) - (message-in-reply-to message)) - ((_ . (resume-actor-id . kont)) - (if (not (equal? (message-to message) - resume-actor-id)) - (throw 'resuming-to-wrong-actor - "Attempted to resume a coroutine to the wrong actor!" - #:expected-actor-id (message-to message) - #:got-actor-id resume-actor-id - #:message message)) - (kont message)) - (#f (throw 'no-waiting-coroutine - "message in-reply-to tries to resume nonexistent coroutine" - message)))) + (call-catching-coroutine + (lambda () + (match (hash-remove! (hive-waiting-coroutines hive) + (message-in-reply-to message)) + ((_ . (resume-actor-id . kont)) + (if (not (equal? (message-to message) + resume-actor-id)) + (throw 'resuming-to-wrong-actor + "Attempted to resume a coroutine to the wrong actor!" + #:expected-actor-id (message-to message) + #:got-actor-id resume-actor-id + #:message message)) + (let (;; @@: How should we resolve resuming coroutines to actors who are + ;; now gone? + (actor (resolve-actor-to)) + (result (kont message))) + (maybe-autoreply actor) + result)) + (#f (throw 'no-waiting-coroutine + "message in-reply-to tries to resume nonexistent coroutine" + message)))))) (define (process-remote-message) ;; Find the ambassador