agenda: Rename 8sync-nowait to 8sync.
[8sync.git] / 8sync / systems / actors.scm
index 9009d301470a1c9872fd32cfe306b5f1b1b1653d..f9574d39b6a7a08614206509b0acf710389a1955 100644 (file)
@@ -31,7 +31,6 @@
             big-random-number
             big-random-number-string
             simple-message-id-generator
-            require-slot
 
             <actor>
             actor-id
@@ -48,7 +47,8 @@
             actor-id-hive
             actor-id-string
 
-            make-action-dispatch
+            mlambda define-mhandler
+            simple-dispatcher build-actions make-action-dispatch
             define-simple-actor
 
             <hive>
@@ -71,6 +71,8 @@
             send-message send-message-wait
             reply-message reply-message-wait
 
+            <- <-wait <-reply <-reply-wait
+
             ez-run-hive
             bootstrap-message
 
       (set! counter (1+ counter))
       (string-append prefix (number->string counter)))))
 
-(define (require-slot slot-name)
-  "Generate something for #:init-thunk to complain about unfilled slot"
-  (lambda ()
-    (throw 'required-slot
-           (format #f "Slot ~s not filled" slot-name)
-           slot-name)))
-
 
 \f
 ;;; Messages
@@ -202,7 +197,7 @@ If key not found and DFLT not provided, throw an error."
          (message (make-message (hive-gen-message-id hive) to-id
                                 (actor-id from-actor) action
                                 (kwarg-list-to-alist message-body-args))))
-    (8sync-nowait (hive-process-message hive message))))
+    (8sync (hive-process-message hive message))))
 
 (define (send-message-wait from-actor to-id action . message-body-args)
   "Send a message from an actor to another, but wait until we get a response"
@@ -229,7 +224,7 @@ If key not found and DFLT not provided, throw an error."
                                     (actor-id from-actor) '*reply*
                                     (kwarg-list-to-alist message-body-args)
                                     #:in-reply-to (message-id original-message))))
-    (8sync-nowait (hive-process-message hive new-message))))
+    (8sync (hive-process-message hive new-message))))
 
 (define (reply-message-wait from-actor original-message
                             . message-body-args)
@@ -246,23 +241,30 @@ If key not found and DFLT not provided, throw an error."
     (abort-to-prompt abort-to from-actor new-message)))
 
 
+;;; Aliases!
+;;; See: http://mumble.net/~jar/articles/oo-moon-weinreb.html
+;;;   (also worth seeing: http://mumble.net/~jar/articles/oo.html )
+
+(define <- send-message)
+(define <-wait send-message-wait)
+(define <-reply reply-message)
+(define <-reply-wait reply-message-wait)
+
+
 \f
 ;;; Main actor implementation
 ;;; =========================
 
 (define-class <actor> ()
   ;; An address object
-  (id #:init-thunk (require-slot "id")
-      #:init-keyword #:id
+  (id #:init-keyword #:id
       #:getter actor-id)
   ;; The hive we're connected to.
   ;; We need this to be able to send messages.
-  (hive #:init-thunk (require-slot "hive")
-        #:init-keyword #:hive
+  (hive #:init-keyword #:hive
         #:accessor actor-hive)
   ;; How we receive and process new messages
-  (message-handler #:init-thunk (require-slot "message-handler")
-                   #:allocation #:each-subclass))
+  (message-handler #:allocation #:each-subclass))
 
 (define-method (actor-message-handler (actor <actor>))
   (slot-ref actor 'message-handler))
@@ -314,11 +316,50 @@ If key not found and DFLT not provided, throw an error."
 ;;; Actor utilities
 ;;; ===============
 
+
+(define-syntax-rule (with-message-args (message message-arg ...)
+                                       body body* ...)
+  (let ((message-arg (message-ref message (quote message-arg))) ...)
+    body body* ...))
+
+(define-syntax mlambda
+  (lambda (x)
+    "A lambda for building message handlers.
+
+Use it like:
+  (mlambda (actor message foo)
+    ...)
+
+Which is like doing manually:
+  (lambda (actor message)
+    (let ((foo (message-ref message foo)))
+      ...))"
+    (syntax-case x ()
+      ((_ (actor message message-arg ...)
+          docstring
+          body ...)
+       (string? (syntax->datum #'docstring))
+       #'(lambda (actor message)
+           docstring
+           (with-message-args (message message-arg ...) body ...)))
+      ((_ (actor message message-arg ...)
+          body body* ...)
+       #'(lambda (actor message)
+           (with-message-args (message message-arg ...) body body* ...))))))
+
+(define-syntax-rule (define-mhandler (name actor message message-arg ...)
+                      body ...)
+  (define name
+    (mlambda (actor message message-arg ...)
+             body ...)))
+
 (define (simple-dispatcher action-map)
   (lambda (actor message)
     (let* ((action (message-action message))
            (method (assoc-ref action-map action)))
       (if (not method)
+          ;; @@: There's every possibility this should be handled in
+          ;;  hive-process-message instead.
           (throw 'action-not-found
                  "No appropriate action handler found for actor"
                  #:action action
@@ -331,11 +372,20 @@ If key not found and DFLT not provided, throw an error."
   (syntax-rules ()
     ((_ ((action-name action-args ...) body ...))
      (cons (quote action-name)
-           (lambda (action-args ...)
+           (mlambda (action-args ...)
              body ...)))
     ((_ (action-name handler))
      (cons (quote action-name) handler))))
 
+(define-syntax-rule (build-actions action-item ...)
+  "Build a mapping of actions.  Same syntax as make-action-dispatch
+but this doesn't build the dispatcher for you (you probably want to
+pass it to simple-dispatcher).
+
+The advantage here is that since this simply builds an alist, you can
+compose it with other action maps."
+  (list (%expand-action-item action-item) ...))
+
 (define-syntax make-action-dispatch
   (syntax-rules ()
     "Expand a list of action names and actions into an alist
@@ -357,8 +407,7 @@ more compact following syntax:
    ((party actor message)
      (display \"Life of the party!\")))"
     ((make-action-dispatch action-item ...)
-     (simple-dispatcher
-      (list (%expand-action-item action-item) ...)))))
+     (simple-dispatcher (build-actions action-item ...)))))
 
 (define-syntax-rule (define-simple-actor class actions ...)
   (define-class class (<actor>)
@@ -375,8 +424,6 @@ more compact following syntax:
 (define-generic hive-handle-failed-forward)
 
 (define-class <hive> (<actor>)
-  ;; This gets set to itself immediately after being created
-  (hive #:init-value #f)
   (actor-registry #:init-thunk make-hash-table
                   #:getter hive-actor-registry)
   (msg-id-generator #:init-thunk simple-message-id-generator
@@ -447,6 +494,27 @@ more compact following syntax:
                 '*forward*
                 `((original . ,message))))
 
+(define-method (hive-reply-with-error (hive <hive>) original-message
+                                      error-key error-args)
+  ;; We only supply the error-args if the original sender is on the same hive
+  (define (orig-actor-on-same-hive?)
+    (equal? (hive-id hive)
+            (address-hive-id (message-from original-message))))
+  (set-message-replied! original-message #t)
+  (let* ((new-message-body
+          (if (orig-actor-on-same-hive?)
+              `((original-message . ,original-message)
+                (error-key . ,error-key)
+                (error-args . ,error-args))
+              `((original-message . ,original-message)
+                (error-key . ,error-key))))
+         (new-message (make-message (hive-gen-message-id hive)
+                                    (message-from original-message)
+                                    (actor-id hive) '*error*
+                                    new-message-body
+                                    #:in-reply-to (message-id original-message))))
+    (8sync (hive-process-message hive new-message))))
+
 (define-method (hive-process-message (hive <hive>) message)
   "Handle one message, or forward it via an ambassador"
   (define (maybe-autoreply actor)
@@ -469,16 +537,34 @@ more compact following syntax:
       actor))
 
   (define (call-catching-coroutine thunk)
+    (define (call-catching-errors)
+      ;; TODO: maybe parameterize (or attach to hive) and use
+      ;;   maybe-catch-all from agenda.scm
+      ;; @@: Why not just use with-throw-handler and let the catch
+      ;;   happen at the agenda?  That's what we used to do, but
+      ;;   it ended up with a SIGABRT.  See:
+      ;;     http://lists.gnu.org/archive/html/bug-guile/2016-05/msg00003.html
+      (catch #t
+        thunk
+        ;; In the actor model, we don't totally crash on errors.
+        (lambda _ #f)
+        ;; If an error happens, we raise it
+        (lambda (key . args)
+          (if (message-needs-reply message)
+              ;; If the message is waiting on a reply, let them know
+              ;; something went wrong.
+              (hive-reply-with-error hive message key args))
+          ;; print error message
+          (apply print-error-and-continue key args))))
     (call-with-prompt (hive-prompt hive)
-      thunk
+      call-catching-errors
       (lambda (kont actor message)
-        (let ((hive (actor-hive actor)))
-          ;; Register the coroutine
-          (hash-set! (hive-waiting-coroutines hive)
-                     (message-id message)
-                     (cons (actor-id actor) kont))
-          ;; Send off the message
-          (8sync (hive-process-message hive message))))))
+        ;; Register the coroutine
+        (hash-set! (hive-waiting-coroutines hive)
+                   (message-id message)
+                   (cons (actor-id actor) kont))
+        ;; Send off the message
+        (8sync (hive-process-message hive message)))))
 
   (define (process-local-message)
     (let ((actor (resolve-actor-to)))
@@ -495,27 +581,44 @@ more compact following syntax:
            result)))))
 
   (define (resume-waiting-coroutine)
-    (call-catching-coroutine
-     (lambda ()
-       (match (hash-remove! (hive-waiting-coroutines hive)
-                            (message-in-reply-to message))
-         ((_ . (resume-actor-id . kont))
-          (if (not (equal? (message-to message)
-                           resume-actor-id))
-              (throw 'resuming-to-wrong-actor
-                     "Attempted to resume a coroutine to the wrong actor!"
-                     #:expected-actor-id (message-to message)
-                     #:got-actor-id resume-actor-id
-                     #:message message))
-          (let (;; @@: How should we resolve resuming coroutines to actors who are
-                ;;   now gone?
-                (actor (resolve-actor-to))
-                (result (kont message)))
-            (maybe-autoreply actor)
-            result))
-         (#f (throw 'no-waiting-coroutine
-                    "message in-reply-to tries to resume nonexistent coroutine"
-                    message))))))
+    (cond
+     ((eq? (message-action message) '*reply*)
+      (call-catching-coroutine
+       (lambda ()
+         (match (hash-remove! (hive-waiting-coroutines hive)
+                              (message-in-reply-to message))
+           ((_ . (resume-actor-id . kont))
+            (if (not (equal? (message-to message)
+                             resume-actor-id))
+                (throw 'resuming-to-wrong-actor
+                       "Attempted to resume a coroutine to the wrong actor!"
+                       #:expected-actor-id (message-to message)
+                       #:got-actor-id resume-actor-id
+                       #:message message))
+            (let (;; @@: How should we resolve resuming coroutines to actors who are
+                  ;;   now gone?
+                  (actor (resolve-actor-to))
+                  (result (kont message)))
+              (maybe-autoreply actor)
+              result))
+           (#f (throw 'no-waiting-coroutine
+                      "message in-reply-to tries to resume nonexistent coroutine"
+                      message))))))
+     ;; Yikes, we must have gotten an error or something back
+     (else
+      ;; @@: Not what we want in the long run?
+      ;; What we'd *prefer* to do is to resume this message
+      ;; and throw an error inside the message handler
+      ;; (say, from send-mesage-wait), but that causes a SIGABRT (??!!)
+      (hash-remove! (hive-waiting-coroutines hive)
+                    (message-in-reply-to message))
+      (let ((explaination
+             (if (eq? (message-action message) '*reply*)
+                 "Won't resume coroutine; got an *error* as a reply"
+                 "Won't resume coroutine because action is not *reply*")))
+        (throw 'hive-unresumable-coroutine
+               explaination
+               #:message message)))))
 
   (define (process-remote-message)
     ;; Find the ambassador
@@ -539,7 +642,7 @@ more compact following syntax:
         (process-remote-message))))
 
 (define-method (hive-actor-local? (hive <hive>) address)
-  (hash-ref (hive-actor-registry hive) address))
+  (equal? (hive-id hive) (address-hive-id address)))
 
 (define-method (hive-register-actor! (hive <hive>) (actor <actor>))
   (hash-set! (hive-actor-registry hive) (actor-id actor) actor))