actors: Handle messages/coroutines in want of reply and errors.
[8sync.git] / 8sync / systems / actors.scm
index e33a88b3658eb514da2dd21b04d85e1f6305f322..e5dd053d74056bdfa6b0865d8c2921724049fe6e 100644 (file)
@@ -459,6 +459,27 @@ more compact following syntax:
                 '*forward*
                 `((original . ,message))))
 
+(define-method (hive-reply-with-error (hive <hive>) original-message
+                                      error-key error-args)
+  ;; We only supply the error-args if the original sender is on the same hive
+  (define (orig-actor-on-same-hive?)
+    (equal? (hive-id hive)
+            (address-hive-id (message-from original-message))))
+  (set-message-replied! original-message #t)
+  (let* ((new-message-body
+          (if (orig-actor-on-same-hive?)
+              `((original-message . ,original-message)
+                (error-key . ,error-key)
+                (error-args . ,error-args))
+              `((original-message . ,original-message)
+                (error-key . ,error-key))))
+         (new-message (make-message (hive-gen-message-id hive)
+                                    (message-from original-message)
+                                    (actor-id hive) '*error*
+                                    new-message-body
+                                    #:in-reply-to (message-id original-message))))
+    (8sync-nowait (hive-process-message hive new-message))))
+
 (define-method (hive-process-message (hive <hive>) message)
   "Handle one message, or forward it via an ambassador"
   (define (maybe-autoreply actor)
@@ -481,8 +502,16 @@ more compact following syntax:
       actor))
 
   (define (call-catching-coroutine thunk)
+    (define (call-catching-errors)
+      (with-throw-handler
+          #t thunk
+          (lambda (key . args)
+            (if (message-needs-reply message)
+                ;; If the message is waiting on a reply, let them know
+                ;; something went wrong.
+                (hive-reply-with-error hive message key args)))))
     (call-with-prompt (hive-prompt hive)
-      thunk
+      call-catching-errors
       (lambda (kont actor message)
         ;; Register the coroutine
         (hash-set! (hive-waiting-coroutines hive)
@@ -506,27 +535,44 @@ more compact following syntax:
            result)))))
 
   (define (resume-waiting-coroutine)
-    (call-catching-coroutine
-     (lambda ()
-       (match (hash-remove! (hive-waiting-coroutines hive)
-                            (message-in-reply-to message))
-         ((_ . (resume-actor-id . kont))
-          (if (not (equal? (message-to message)
-                           resume-actor-id))
-              (throw 'resuming-to-wrong-actor
-                     "Attempted to resume a coroutine to the wrong actor!"
-                     #:expected-actor-id (message-to message)
-                     #:got-actor-id resume-actor-id
-                     #:message message))
-          (let (;; @@: How should we resolve resuming coroutines to actors who are
-                ;;   now gone?
-                (actor (resolve-actor-to))
-                (result (kont message)))
-            (maybe-autoreply actor)
-            result))
-         (#f (throw 'no-waiting-coroutine
-                    "message in-reply-to tries to resume nonexistent coroutine"
-                    message))))))
+    (cond
+     ((eq? (message-action message) '*reply*)
+      (call-catching-coroutine
+       (lambda ()
+         (match (hash-remove! (hive-waiting-coroutines hive)
+                              (message-in-reply-to message))
+           ((_ . (resume-actor-id . kont))
+            (if (not (equal? (message-to message)
+                             resume-actor-id))
+                (throw 'resuming-to-wrong-actor
+                       "Attempted to resume a coroutine to the wrong actor!"
+                       #:expected-actor-id (message-to message)
+                       #:got-actor-id resume-actor-id
+                       #:message message))
+            (let (;; @@: How should we resolve resuming coroutines to actors who are
+                  ;;   now gone?
+                  (actor (resolve-actor-to))
+                  (result (kont message)))
+              (maybe-autoreply actor)
+              result))
+           (#f (throw 'no-waiting-coroutine
+                      "message in-reply-to tries to resume nonexistent coroutine"
+                      message))))))
+     ;; Yikes, we must have gotten an error or something back
+     (else
+      ;; @@: Not what we want in the long run?
+      ;; What we'd *prefer* to do is to resume this message
+      ;; and throw an error inside the message handler
+      ;; (say, from send-mesage-wait), but that causes a SIGABRT (??!!)
+      (hash-remove! (hive-waiting-coroutines hive)
+                    (message-in-reply-to message))
+      (let ((explaination
+             (if (eq? (message-action message) '*reply*)
+                 "Won't resume coroutine; got an *error* as a reply"
+                 "Won't resume coroutine because action is not *reply*")))
+        (throw 'hive-unresumable-coroutine
+               explaination
+               #:message message)))))
 
   (define (process-remote-message)
     ;; Find the ambassador