actors: Move message body to being an argument list for procedures.
[8sync.git] / 8sync / systems / actors.scm
index 119f8a40a1256af45eb52221322c961f0229bed9..ce46f63614fb7640eb3db3ee90450cffac999d92 100644 (file)
             big-random-number
             big-random-number-string
             simple-message-id-generator
-            require-slot
 
             <actor>
             actor-id
-            actor-hive
             actor-message-handler
 
             ;;; Commenting out the <address> type for now;
@@ -49,7 +47,7 @@
             actor-id-hive
             actor-id-string
 
-            make-action-dispatch
+            simple-dispatcher build-actions make-action-dispatch
             define-simple-actor
 
             <hive>
             message-to message-action message-from
             message-id message-body message-in-reply-to
             message-wants-reply
-            message-ref
 
-            send-message send-message-wait
-            reply-message reply-message-wait
+            message-auto-reply?
+
+            <- <-wait <-reply <-reply-wait
+
+            call-with-message msg-receive =>
 
             ez-run-hive
-            hive-bootstrap-message
+            bootstrap-message
 
             serialize-message write-message
             serialize-message-pretty pprint-message
       (set! counter (1+ counter))
       (string-append prefix (number->string counter)))))
 
-(define (require-slot slot-name)
-  "Generate something for #:init-thunk to complain about unfilled slot"
-  (lambda ()
-    (throw 'required-slot
-           (format #f "Slot ~s not filled" slot-name)
-           slot-name)))
-
 
 \f
 ;;; Messages
 
 (define-record-type <message>
   (make-message-intern id to from action
-                       body in-reply-to wants-reply   ; do we need hive-proxy?
-                       ;; Are these still needed?
-                       replied deferred-reply)
+                       body in-reply-to wants-reply
+                       replied)
   message?
   (id message-id)
   (to message-to)
   (body message-body)
   (in-reply-to message-in-reply-to)
   (wants-reply message-wants-reply)
-
-  ;; See XUDD source for these.  Not use yet, maybe eventually will be?
-  ;; XUDD uses them for autoreply.
-  ;; Requiring mutation on message objects is clearly not great,
-  ;; but it may be worth it...?  Investigate!
-  (replied message-replied set-message-replied!)
-  (deferred-reply message-deferred-reply set-message-deferred-reply!))
+  (replied message-replied set-message-replied!))
 
 
 (define* (make-message id to from action body
                        #:key in-reply-to wants-reply
-                       replied deferred-reply)
+                       replied)
   (make-message-intern id to from action body
-                       in-reply-to wants-reply replied
-                       deferred-reply))
-
-;; Note: the body of messages is currently an alist, but it's created
-;;   from a keyword based property list (see the following two functions).
-;;   But, that's an extra conversion step, and maybe totally unnecessary:
-;;   we already have message-ref, and this could just pull a keyword
-;;   from a property list.
-;;   The main ways this might be useful are error checking,
-;;   serialization across the wire (but even that might require some
-;;   change), and using existing tooling (though adding new tooling
-;;   would be negligible in implementation effort.)
-
-;; This cons cell is immutable and unique (for eq? tests)
-(define %nothing-provided (cons 'nothing 'provided))
-
-(define* (message-ref message key #:optional (dflt %nothing-provided))
-  "Extract KEY from body of MESSAGE.
-
-Optionally set default with [DFLT]
-If key not found and DFLT not provided, throw an error."
-  (let ((result (assoc key (message-body message))))
-    (if result (cdr result)
-        (if (eq? dflt %nothing-provided)
-            (throw 'message-missing-key
-                   "Message body does not contain key and no default provided"
-                   #:key key
-                   #:message message)
-            dflt))))
-
-
-(define (message-needs-reply message)
+                       in-reply-to wants-reply replied))
+
+(define (message-auto-reply? message)
+  (eq? (message-action message) '*auto-reply*))
+
+(define (message-needs-reply? message)
   "See if this message needs a reply still"
   (and (message-wants-reply message)
-       (not (or (message-replied message)
-                (message-deferred-reply message)))))
+       (not (message-replied message))))
 
 
 (define (kwarg-list-to-alist args)
@@ -195,21 +154,24 @@ If key not found and DFLT not provided, throw an error."
                 args)))))
 
 
-(define (send-message from-actor to-id action . message-body-args)
+;;; See: https://web.archive.org/web/20081223021934/http://mumble.net/~jar/articles/oo-moon-weinreb.html
+;;;   (also worth seeing: http://mumble.net/~jar/articles/oo.html )
+
+(define (<- from-actor to-id action . message-body-args)
   "Send a message from an actor to another actor"
   (let* ((hive (actor-hive from-actor))
          (message (make-message (hive-gen-message-id hive) to-id
                                 (actor-id from-actor) action
-                                (kwarg-list-to-alist message-body-args))))
-    (8sync-nowait (hive-process-message hive message))))
+                                message-body-args)))
+    (8sync (hive-process-message hive message))))
 
-(define (send-message-wait from-actor to-id action . message-body-args)
+(define (<-wait from-actor to-id action . message-body-args)
   "Send a message from an actor to another, but wait until we get a response"
   (let* ((hive (actor-hive from-actor))
          (abort-to (hive-prompt (actor-hive from-actor)))
          (message (make-message (hive-gen-message-id hive) to-id
                                 (actor-id from-actor) action
-                                (kwarg-list-to-alist message-body-args)
+                                message-body-args
                                 #:wants-reply #t)))
     (abort-to-prompt abort-to from-actor message)))
 
@@ -218,20 +180,29 @@ If key not found and DFLT not provided, throw an error."
 ;;   not have an exception thrown and instead just have a message with
 ;;   the appropriate '*error* message returned.
 
-(define (reply-message from-actor original-message
-                       . message-body-args)
+(define (<-reply from-actor original-message . message-body-args)
   "Reply to a message"
   (set-message-replied! original-message #t)
   (let* ((hive (actor-hive from-actor))
          (new-message (make-message (hive-gen-message-id hive)
                                     (message-from original-message)
                                     (actor-id from-actor) '*reply*
-                                    (kwarg-list-to-alist message-body-args)
+                                    message-body-args
                                     #:in-reply-to (message-id original-message))))
-    (8sync-nowait (hive-process-message hive new-message))))
+    (8sync (hive-process-message hive new-message))))
 
-(define (reply-message-wait from-actor original-message
-                            . message-body-args)
+(define (<-auto-reply from-actor original-message)
+  "Auto-reply to a message.  Internal use only!"
+  (set-message-replied! original-message #t)
+  (let* ((hive (actor-hive from-actor))
+         (new-message (make-message (hive-gen-message-id hive)
+                                    (message-from original-message)
+                                    (actor-id from-actor) '*auto-reply*
+                                    '()
+                                    #:in-reply-to (message-id original-message))))
+    (8sync (hive-process-message hive new-message))))
+
+(define (<-reply-wait from-actor original-message . message-body-args)
   "Reply to a messsage, but wait until we get a response"
   (set-message-replied! original-message #t)
   (let* ((hive (actor-hive from-actor))
@@ -239,7 +210,7 @@ If key not found and DFLT not provided, throw an error."
          (new-message (make-message (hive-gen-message-id hive)
                                     (message-from original-message)
                                     (actor-id from-actor) '*reply*
-                                    (kwarg-list-to-alist message-body-args)
+                                    message-body-args
                                     #:wants-reply #t
                                     #:in-reply-to (message-id original-message))))
     (abort-to-prompt abort-to from-actor new-message)))
@@ -251,17 +222,14 @@ If key not found and DFLT not provided, throw an error."
 
 (define-class <actor> ()
   ;; An address object
-  (id #:init-thunk (require-slot "id")
-      #:init-keyword #:id
+  (id #:init-keyword #:id
       #:getter actor-id)
   ;; The hive we're connected to.
   ;; We need this to be able to send messages.
-  (hive #:init-thunk (require-slot "hive")
-        #:init-keyword #:hive
+  (hive #:init-keyword #:hive
         #:accessor actor-hive)
   ;; How we receive and process new messages
-  (message-handler #:init-thunk (require-slot "message-handler")
-                   #:allocation #:each-subclass))
+  (message-handler #:allocation #:each-subclass))
 
 (define-method (actor-message-handler (actor <actor>))
   (slot-ref actor 'message-handler))
@@ -318,23 +286,30 @@ If key not found and DFLT not provided, throw an error."
     (let* ((action (message-action message))
            (method (assoc-ref action-map action)))
       (if (not method)
+          ;; @@: There's every possibility this should be handled in
+          ;;  hive-process-message instead.
           (throw 'action-not-found
                  "No appropriate action handler found for actor"
                  #:action action
                  #:actor actor
                  #:message message
                  #:available-actions (map car action-map)))
-      (method actor message))))
+      (apply method actor message (message-body message)))))
 
 (define-syntax %expand-action-item
   (syntax-rules ()
-    ((_ ((action-name action-args ...) body ...))
-     (cons (quote action-name)
-           (lambda (action-args ...)
-             body ...)))
     ((_ (action-name handler))
      (cons (quote action-name) handler))))
 
+(define-syntax-rule (build-actions action-item ...)
+  "Build a mapping of actions.  Same syntax as make-action-dispatch
+but this doesn't build the dispatcher for you (you probably want to
+pass it to simple-dispatcher).
+
+The advantage here is that since this simply builds an alist, you can
+compose it with other action maps."
+  (list (%expand-action-item action-item) ...))
+
 (define-syntax make-action-dispatch
   (syntax-rules ()
     "Expand a list of action names and actions into an alist
@@ -356,8 +331,7 @@ more compact following syntax:
    ((party actor message)
      (display \"Life of the party!\")))"
     ((make-action-dispatch action-item ...)
-     (simple-dispatcher
-      (list (%expand-action-item action-item) ...)))))
+     (simple-dispatcher (build-actions action-item ...)))))
 
 (define-syntax-rule (define-simple-actor class actions ...)
   (define-class class (<actor>)
@@ -374,8 +348,6 @@ more compact following syntax:
 (define-generic hive-handle-failed-forward)
 
 (define-class <hive> (<actor>)
-  ;; This gets set to itself immediately after being created
-  (hive #:init-value #f)
   (actor-registry #:init-thunk make-hash-table
                   #:getter hive-actor-registry)
   (msg-id-generator #:init-thunk simple-message-id-generator
@@ -446,14 +418,33 @@ more compact following syntax:
                 '*forward*
                 `((original . ,message))))
 
+(define-method (hive-reply-with-error (hive <hive>) original-message
+                                      error-key error-args)
+  ;; We only supply the error-args if the original sender is on the same hive
+  (define (orig-actor-on-same-hive?)
+    (equal? (hive-id hive)
+            (address-hive-id (message-from original-message))))
+  (set-message-replied! original-message #t)
+  (let* ((new-message-body
+          (if (orig-actor-on-same-hive?)
+              `(#:original-message ,original-message
+                #:error-key ,error-key
+                #:error-args ,error-args)
+              `(#:original-message ,original-message
+                #:error-key ,error-key)))
+         (new-message (make-message (hive-gen-message-id hive)
+                                    (message-from original-message)
+                                    (actor-id hive) '*error*
+                                    new-message-body
+                                    #:in-reply-to (message-id original-message))))
+    (8sync (hive-process-message hive new-message))))
+
 (define-method (hive-process-message (hive <hive>) message)
   "Handle one message, or forward it via an ambassador"
   (define (maybe-autoreply actor)
     ;; Possibly autoreply
-    (if (message-needs-reply message)
-        ;; @@: Should we give *autoreply* as the action instead of *reply*?
-        (reply-message actor message
-                       #:*auto-reply* #t)))
+    (if (message-needs-reply? message)
+        (<-auto-reply actor message)))
 
   (define (resolve-actor-to)
     "Get the actor the message was aimed at"
@@ -468,16 +459,34 @@ more compact following syntax:
       actor))
 
   (define (call-catching-coroutine thunk)
+    (define (call-catching-errors)
+      ;; TODO: maybe parameterize (or attach to hive) and use
+      ;;   maybe-catch-all from agenda.scm
+      ;; @@: Why not just use with-throw-handler and let the catch
+      ;;   happen at the agenda?  That's what we used to do, but
+      ;;   it ended up with a SIGABRT.  See:
+      ;;     http://lists.gnu.org/archive/html/bug-guile/2016-05/msg00003.html
+      (catch #t
+        thunk
+        ;; In the actor model, we don't totally crash on errors.
+        (lambda _ #f)
+        ;; If an error happens, we raise it
+        (lambda (key . args)
+          (if (message-needs-reply? message)
+              ;; If the message is waiting on a reply, let them know
+              ;; something went wrong.
+              (hive-reply-with-error hive message key args))
+          ;; print error message
+          (apply print-error-and-continue key args))))
     (call-with-prompt (hive-prompt hive)
-      thunk
+      call-catching-errors
       (lambda (kont actor message)
-        (let ((hive (actor-hive actor)))
-          ;; Register the coroutine
-          (hash-set! (hive-waiting-coroutines hive)
-                     (message-id message)
-                     (cons (actor-id actor) kont))
-          ;; Send off the message
-          (8sync (hive-process-message hive message))))))
+        ;; Register the coroutine
+        (hash-set! (hive-waiting-coroutines hive)
+                   (message-id message)
+                   (cons (actor-id actor) kont))
+        ;; Send off the message
+        (8sync (hive-process-message hive message)))))
 
   (define (process-local-message)
     (let ((actor (resolve-actor-to)))
@@ -494,27 +503,45 @@ more compact following syntax:
            result)))))
 
   (define (resume-waiting-coroutine)
-    (call-catching-coroutine
-     (lambda ()
-       (match (hash-remove! (hive-waiting-coroutines hive)
-                            (message-in-reply-to message))
-         ((_ . (resume-actor-id . kont))
-          (if (not (equal? (message-to message)
-                           resume-actor-id))
-              (throw 'resuming-to-wrong-actor
-                     "Attempted to resume a coroutine to the wrong actor!"
-                     #:expected-actor-id (message-to message)
-                     #:got-actor-id resume-actor-id
-                     #:message message))
-          (let (;; @@: How should we resolve resuming coroutines to actors who are
-                ;;   now gone?
-                (actor (resolve-actor-to))
-                (result (kont message)))
-            (maybe-autoreply actor)
-            result))
-         (#f (throw 'no-waiting-coroutine
-                    "message in-reply-to tries to resume nonexistent coroutine"
-                    message))))))
+    (cond
+     ((or (eq? (message-action message) '*reply*)
+          (eq? (message-action message) '*auto-reply*))
+      (call-catching-coroutine
+       (lambda ()
+         (match (hash-remove! (hive-waiting-coroutines hive)
+                              (message-in-reply-to message))
+           ((_ . (resume-actor-id . kont))
+            (if (not (equal? (message-to message)
+                             resume-actor-id))
+                (throw 'resuming-to-wrong-actor
+                       "Attempted to resume a coroutine to the wrong actor!"
+                       #:expected-actor-id (message-to message)
+                       #:got-actor-id resume-actor-id
+                       #:message message))
+            (let (;; @@: How should we resolve resuming coroutines to actors who are
+                  ;;   now gone?
+                  (actor (resolve-actor-to))
+                  (result (kont message)))
+              (maybe-autoreply actor)
+              result))
+           (#f (throw 'no-waiting-coroutine
+                      "message in-reply-to tries to resume nonexistent coroutine"
+                      message))))))
+     ;; Yikes, we must have gotten an error or something back
+     (else
+      ;; @@: Not what we want in the long run?
+      ;; What we'd *prefer* to do is to resume this message
+      ;; and throw an error inside the message handler
+      ;; (say, from send-mesage-wait), but that causes a SIGABRT (??!!)
+      (hash-remove! (hive-waiting-coroutines hive)
+                    (message-in-reply-to message))
+      (let ((explaination
+             (if (eq? (message-action message) '*reply*)
+                 "Won't resume coroutine; got an *error* as a reply"
+                 "Won't resume coroutine because action is not *reply*")))
+        (throw 'hive-unresumable-coroutine
+               explaination
+               #:message message)))))
 
   (define (process-remote-message)
     ;; Find the ambassador
@@ -538,7 +565,7 @@ more compact following syntax:
         (process-remote-message))))
 
 (define-method (hive-actor-local? (hive <hive>) address)
-  (hash-ref (hive-actor-registry hive) address))
+  (equal? (hive-id hive) (address-hive-id address)))
 
 (define-method (hive-register-actor! (hive <hive>) (actor <actor>))
   (hash-set! (hive-actor-registry hive) (actor-id actor) actor))
@@ -552,7 +579,6 @@ so this gets called from the nicer hive-create-actor interface.  See
 that method for documentation."
   (let* ((actor-id (hive-gen-actor-id hive id-cookie))
          (actor (apply make actor-class
-                       ;; @@: If we switch to a hive-proxy, do it here
                        #:hive hive
                        #:id actor-id
                        init)))
@@ -568,17 +594,27 @@ that method for documentation."
   (%hive-create-actor hive actor-class
                       init id-cookie))
 
+(define (call-with-message message proc)
+  "Applies message body arguments into procedure, with message as first
+argument.  Similar to call-with-values in concept."
+  (apply proc message (message-body message)))
+
+;; (msg-receive (<- bar baz)
+;;     (baz)
+;;   basil)
 
-;; TODO: Give actors this instead of the actual hive reference
-(define-class <hive-proxy> ()
-  (send-message #:getter proxy-send-message
-                #:init-keyword #:send-message)
-  (create-actor #:getter proxy-create-actor
-                #:init-keyword #:create-actor))
+;; Emacs: (put 'msg-receive 'scheme-indent-function 2)
 
-;; Live the hive proxy, but has access to the hive itself...
-(define-class <debug-hive-proxy> (<hive-proxy>)
-  (hive #:init-keyword #:hive))
+;; @@: Or receive-msg or receieve-message or??
+(define-syntax-rule (msg-receive arglist the-message body ...)
+  (call-with-message the-message
+                     (lambda* arglist
+                       body ...)))
+
+;; Emacs: (put '=> 'scheme-indent-function 2)
+;;; An experimental alias.
+(define-syntax-rule (=> rest ...)
+  (msg-receive rest ...))
 
 
 \f
@@ -587,23 +623,22 @@ that method for documentation."
 
 ;; TODO: move send-message and friends here...?
 
-;; TODO: Rewrite this inside of a <hive-proxy> ?
 (define* (create-actor from-actor actor-class #:rest init)
   "Create an instance of actor-class.  Return the new actor's id.
 
 This is the method actors should call directly (unless they want
 to supply an id-cookie, in which case they should use
 create-actor*)."
-  (8sync (%hive-create-actor (actor-hive from-actor) actor-class
-                             init #f)))
+  (%hive-create-actor (actor-hive from-actor) actor-class
+                      init #f))
 
 
 (define* (create-actor* from-actor actor-class id-cookie #:rest init)
   "Create an instance of actor-class.  Return the new actor's id.
 
 Like create-actor, but permits supplying an id-cookie."
-  (8sync (%hive-create-actor (actor-hive from-actor) actor-class
-                             init id-cookie)))
+  (%hive-create-actor (actor-hive from-actor) actor-class
+                      init id-cookie))
 
 
 (define (self-destruct actor)
@@ -633,9 +668,9 @@ an integer."
       (spawn-and-queue-repl-server! agenda)))
     (start-agenda agenda)))
 
-(define (hive-bootstrap-message hive to-id action . message-body-args)
+(define (bootstrap-message hive to-id action . message-body-args)
   (wrap
-   (apply send-message hive to-id action message-body-args)))
+   (apply <- hive to-id action message-body-args)))
 
 
 \f
@@ -652,8 +687,7 @@ an integer."
    (message-body message)
    (message-in-reply-to message)
    (message-wants-reply message)
-   (message-replied message)
-   (message-deferred-reply message)))
+   (message-replied message)))
 
 (define* (write-message message #:optional (port (current-output-port)))
   "Write out a message to a port for easy reading later.
@@ -673,8 +707,7 @@ to improve that.  You'll need a better serializer for that.."
     (body ,(message-body message))
     (in-reply-to ,(message-in-reply-to message))
     (wants-reply ,(message-wants-reply message))
-    (replied ,(message-replied message))
-    (deferred-reply ,(message-deferred-reply message))))
+    (replied ,(message-replied message))))
 
 (define (pprint-message message)
   "Pretty print a message."
@@ -683,10 +716,10 @@ to improve that.  You'll need a better serializer for that.."
 (define* (read-message #:optional (port (current-input-port)))
   "Read a message serialized via serialize-message from PORT"
   (match (read port)
-    ((id to from action body in-reply-to wants-reply replied deferred-reply)
+    ((id to from action body in-reply-to wants-reply replied)
      (make-message-intern
       id to from action body
-      in-reply-to wants-reply replied deferred-reply))
+      in-reply-to wants-reply replied))
     (anything-else
      (throw 'message-read-bad-structure
             "Could not read message from structure"