actors: Properly autoreply to replies; make reply-message-wait work.
[8sync.git] / 8sync / systems / actors.scm
index 0ae6e054c0c0eec3ea57f34f9d131f13efb0b023..8f00d570d9f5880d901a6063334e59e047d3f4d0 100644 (file)
             reply-message reply-message-wait
 
             ez-run-hive
-            hive-bootstrap-message))
+            hive-bootstrap-message
+
+            serialize-message write-message
+            serialize-message-pretty pprint-message
+            read-message read-message-from-string))
 
 ;; For ids
 (define %random-state
            slot-name)))
 
 
+\f
+;;; Messages
+;;; ========
+
+
+(define-record-type <message>
+  (make-message-intern id to from action
+                       body in-reply-to wants-reply   ; do we need hive-proxy?
+                       ;; Are these still needed?
+                       replied deferred-reply)
+  message?
+  (id message-id)
+  (to message-to)
+  (from message-from)
+  (action message-action)
+  (body message-body)
+  (in-reply-to message-in-reply-to)
+  (wants-reply message-wants-reply)
+
+  ;; See XUDD source for these.  Not use yet, maybe eventually will be?
+  ;; XUDD uses them for autoreply.
+  ;; Requiring mutation on message objects is clearly not great,
+  ;; but it may be worth it...?  Investigate!
+  (replied message-replied set-message-replied!)
+  (deferred-reply message-deferred-reply set-message-deferred-reply!))
+
+
+(define* (make-message id to from action body
+                       #:key in-reply-to wants-reply
+                       replied deferred-reply)
+  (make-message-intern id to from action body
+                       in-reply-to wants-reply replied
+                       deferred-reply))
+
+;; Note: the body of messages is currently an alist, but it's created
+;;   from a keyword based property list (see the following two functions).
+;;   But, that's an extra conversion step, and maybe totally unnecessary:
+;;   we already have message-ref, and this could just pull a keyword
+;;   from a property list.
+;;   The main ways this might be useful are error checking,
+;;   serialization across the wire (but even that might require some
+;;   change), and using existing tooling (though adding new tooling
+;;   would be negligible in implementation effort.)
+
+;; This cons cell is immutable and unique (for eq? tests)
+(define %nothing-provided (cons 'nothing 'provided))
+
+(define* (message-ref message key #:optional (dflt %nothing-provided))
+  "Extract KEY from body of MESSAGE.
+
+Optionally set default with [DFLT]
+If key not found and DFLT not provided, throw an error."
+  (let ((result (assoc key (message-body message))))
+    (if result (cdr result)
+        (if (eq? dflt %nothing-provided)
+            (throw 'message-missing-key
+                   "Message body does not contain key and no default provided"
+                   #:key key
+                   #:message message)
+            dflt))))
+
+
+(define (message-needs-reply message)
+  "See if this message needs a reply still"
+  (and (message-wants-reply message)
+       (not (or (message-replied message)
+                (message-deferred-reply message)))))
+
+
+(define (kwarg-list-to-alist args)
+  (let loop ((remaining args)
+             (result '()))
+    (match remaining
+      (((? keyword? key) val rest ...)
+       (loop rest
+             (cons (cons (keyword->symbol key) val) 
+                   result)))
+      (() result)
+      (_ (throw 'invalid-kwarg-list
+                "Invalid keyword argument list"
+                args)))))
+
+
+(define (send-message from-actor to-id action . message-body-args)
+  "Send a message from an actor to another actor"
+  (let* ((hive (actor-hive from-actor))
+         (message (make-message (hive-gen-message-id hive) to-id
+                                (actor-id from-actor) action
+                                (kwarg-list-to-alist message-body-args))))
+    (8sync (hive-process-message hive message))))
+
+(define (send-message-wait from-actor to-id action . message-body-args)
+  "Send a message from an actor to another, but wait until we get a response"
+  (let* ((hive (actor-hive from-actor))
+         (abort-to (hive-prompt (actor-hive from-actor)))
+         (message (make-message (hive-gen-message-id hive) to-id
+                                (actor-id from-actor) action
+                                (kwarg-list-to-alist message-body-args)
+                                #:wants-reply #t)))
+    (abort-to-prompt abort-to from-actor message)))
+
+;; TODO: Intelligently ~propagate(ish) errors on -wait functions.
+;;   We might have `send-message-wait-brazen' to allow callers to
+;;   not have an exception thrown and instead just have a message with
+;;   the appropriate '*error* message returned.
+
+(define (reply-message from-actor original-message
+                       . message-body-args)
+  "Reply to a message"
+  (set-message-replied! original-message #t)
+  (let* ((hive (actor-hive from-actor))
+         (new-message (make-message (hive-gen-message-id hive)
+                                    (message-from original-message)
+                                    (actor-id from-actor) '*reply*
+                                    (kwarg-list-to-alist message-body-args)
+                                    #:in-reply-to (message-id original-message))))
+    (8sync (hive-process-message hive new-message))))
+
+(define (reply-message-wait from-actor original-message
+                            . message-body-args)
+  "Reply to a messsage, but wait until we get a response"
+  (set-message-replied! original-message #t)
+  (let* ((hive (actor-hive from-actor))
+         (abort-to (hive-prompt (actor-hive from-actor)))
+         (new-message (make-message (hive-gen-message-id hive)
+                                    (message-from original-message)
+                                    (actor-id from-actor) '*reply*
+                                    (kwarg-list-to-alist message-body-args)
+                                    #:wants-reply #t
+                                    #:in-reply-to (message-id original-message))))
+    (abort-to-prompt abort-to from-actor new-message)))
+
+
 \f
 ;;; Main actor implementation
 ;;; =========================
@@ -219,7 +356,7 @@ more compact following syntax:
      (simple-dispatcher
       (list (%expand-action-item action-item) ...)))))
 
-(define-syntax-rule (define-simple-actor class (actions ...))
+(define-syntax-rule (define-simple-actor class actions ...)
   (define-class class (<actor>)
     (message-handler
      #:init-value (make-action-dispatch actions ...)
@@ -248,6 +385,7 @@ more compact following syntax:
   ;; This is a map from cons cell of message-id
   ;;   to a cons cell of (actor-id . coroutine)
   ;; @@: Should we have a <waiting-coroutine> record type?
+  ;; @@: Should there be any way to clear out "old" coroutines?
   (waiting-coroutines #:init-thunk make-hash-table
                       #:getter hive-waiting-coroutines)
   ;; Message prompt
@@ -307,7 +445,15 @@ more compact following syntax:
 
 (define-method (hive-process-message (hive <hive>) message)
   "Handle one message, or forward it via an ambassador"
-  (define (process-local-message)
+  (define (maybe-autoreply actor)
+    ;; Possibly autoreply
+    (if (message-needs-reply message)
+        ;; @@: Should we give *autoreply* as the action instead of *reply*?
+        (reply-message actor message
+                       #:*auto-reply* #t)))
+
+  (define (resolve-actor-to)
+    "Get the actor the message was aimed at"
     (let ((actor (hive-resolve-local-actor hive (message-to message))))
       (if (not actor)
           (throw 'actor-not-found
@@ -316,29 +462,56 @@ more compact following syntax:
                          (address->string (message-from message))
                          (address->string (message-to message)))
                  message))
-      (call-with-prompt (hive-prompt hive)
-        (lambda ()
-          (define message-handler (actor-message-handler actor))
-          ;; @@: Should a more general error handling happen here?
-          (message-handler actor message))
-
-        (lambda (kont actor message)
-          (let ((hive (actor-hive actor)))
-            ;; Register the coroutine
-            (hash-set! (hive-waiting-coroutines hive)
-                       (message-id message)
-                       (cons (actor-id actor) kont))
-            ;; Send off the message
-            (8sync (hive-process-message hive message)))))))
+      actor))
+
+  (define (call-catching-coroutine thunk)
+    (call-with-prompt (hive-prompt hive)
+      thunk
+      (lambda (kont actor message)
+        (let ((hive (actor-hive actor)))
+          ;; Register the coroutine
+          (hash-set! (hive-waiting-coroutines hive)
+                     (message-id message)
+                     (cons (actor-id actor) kont))
+          ;; Send off the message
+          (8sync (hive-process-message hive message))))))
+
+  (define (process-local-message)
+    (let ((actor (resolve-actor-to)))
+      (call-catching-coroutine
+       (lambda ()
+         (define message-handler (actor-message-handler actor))
+         ;; @@: Should a more general error handling happen here?
+         (let ((result
+                (message-handler actor message)))
+           (maybe-autoreply actor)
+           ;; Returning result allows actors to possibly make a run-request
+           ;; at the end of handling a message.
+           ;; ... We do want that, right?
+           result)))))
 
   (define (resume-waiting-coroutine)
-    (match (hash-remove! (hive-waiting-coroutines hive)
-                         (message-in-reply-to message))
-      ((_ . kont)
-       (kont message))
-      (#f (throw 'no-waiting-coroutine
-                 "message in-reply-to tries to resume nonexistent coroutine"
-                 message))))
+    (call-catching-coroutine
+     (lambda ()
+       (match (hash-remove! (hive-waiting-coroutines hive)
+                            (message-in-reply-to message))
+         ((_ . (resume-actor-id . kont))
+          (if (not (equal? (message-to message)
+                           resume-actor-id))
+              (throw 'resuming-to-wrong-actor
+                     "Attempted to resume a coroutine to the wrong actor!"
+                     #:expected-actor-id (message-to message)
+                     #:got-actor-id resume-actor-id
+                     #:message message))
+          (let (;; @@: How should we resolve resuming coroutines to actors who are
+                ;;   now gone?
+                (actor (resolve-actor-to))
+                (result (kont message)))
+            (maybe-autoreply actor)
+            result))
+         (#f (throw 'no-waiting-coroutine
+                    "message in-reply-to tries to resume nonexistent coroutine"
+                    message))))))
 
   (define (process-remote-message)
     ;; Find the ambassador
@@ -417,123 +590,6 @@ Instead, actors should call create-actor."
   (hive #:init-keyword #:hive))
 
 
-\f
-;;; Messages
-;;; ========
-
-
-(define-record-type <message>
-  (make-message-intern id to from action
-                       body in-reply-to wants-reply   ; do we need hive-proxy?
-                       ;; Are these still needed?
-                       replied deferred-reply)
-  message?
-  (id message-id)
-  (to message-to)
-  (from message-from)
-  (action message-action)
-  (body message-body)
-  (in-reply-to message-in-reply-to)
-  (wants-reply message-wants-reply)
-
-  ;; See XUDD source for these.  Not use yet, maybe eventually will be?
-  ;; XUDD uses them for autoreply.
-  ;; Requiring mutation on message objects is clearly not great,
-  ;; but it may be worth it...?  Investigate!
-  (replied message-replied set-message-replied!)
-  (deferred-reply message-deferred-reply set-message-deferred-reply!))
-
-
-(define* (make-message id to from action body
-                       #:key in-reply-to wants-reply
-                       replied deferred-reply)
-  (make-message-intern id to from action body
-                       in-reply-to wants-reply replied
-                       deferred-reply))
-
-;; Note: the body of messages is currently an alist, but it's created
-;;   from a keyword based property list (see the following two functions).
-;;   But, that's an extra conversion step, and maybe totally unnecessary:
-;;   we already have message-ref, and this could just pull a keyword
-;;   from a property list.
-;;   The main ways this might be useful are error checking,
-;;   serialization across the wire (but even that might require some
-;;   change), and using existing tooling (though adding new tooling
-;;   would be negligible in implementation effort.)
-
-(define* (message-ref message key #:optional dflt)
-  "Extract KEY from body of MESSAGE.
-
-Optionally set default with [DFLT]"
-  (let ((result (assoc key (message-body message))))
-    (if result (cdr result)
-        dflt)))
-
-
-(define (kwarg-list-to-alist args)
-  (let loop ((remaining args)
-             (result '()))
-    (match remaining
-      (((? keyword? key) val rest ...)
-       (loop rest
-             (cons (cons (keyword->symbol key) val) 
-                   result)))
-      (() result)
-      (_ (throw 'invalid-kwarg-list
-                "Invalid keyword argument list"
-                args)))))
-
-
-(define (send-message from-actor to-id action . message-body-args)
-  "Send a message from an actor to another actor"
-  (let* ((hive (actor-hive from-actor))
-         (message (make-message (hive-gen-message-id hive) to-id
-                                (actor-id from-actor) action
-                                (kwarg-list-to-alist message-body-args))))
-    (8sync (hive-process-message hive message))))
-
-(define (send-message-wait from-actor to-id action . message-body-args)
-  "Send a message from an actor to another, but wait until we get a response"
-  (let* ((hive (actor-hive from-actor))
-         (agenda-prompt (hive-prompt (actor-hive from-actor)))
-         (message (make-message (hive-gen-message-id hive) to-id
-                                (actor-id from-actor) action
-                                (kwarg-list-to-alist message-body-args)
-                                #:wants-reply #t)))
-    (abort-to-prompt agenda-prompt from-actor message)))
-
-;; TODO: Intelligently ~propagate(ish) errors on -wait functions.
-;;   We might have `send-message-wait-brazen' to allow callers to
-;;   not have an exception thrown and instead just have a message with
-;;   the appropriate '*error* message returned.
-
-(define (reply-message from-actor original-message
-                       . message-body-args)
-  "Reply to a message"
-  (set-message-replied! original-message #t)
-  (let* ((hive (actor-hive from-actor))
-         (new-message (make-message (hive-gen-message-id hive)
-                                    (message-from original-message)
-                                    (actor-id from-actor) '*reply*
-                                    (kwarg-list-to-alist message-body-args)
-                                    #:in-reply-to (message-id original-message))))
-    (8sync (hive-process-message hive new-message))))
-
-(define (reply-message-wait from-actor original-message
-                            . message-body-args)
-  "Reply to a messsage, but wait until we get a response"
-  (set-message-replied! original-message #t)
-  (let* ((hive (actor-hive from-actor))
-         (agenda-prompt (hive-prompt (actor-hive from-actor)))
-         (new-message (make-message (hive-gen-message-id hive)
-                                    (message-from original-message)
-                                    (actor-id from-actor) '*reply*
-                                    (kwarg-list-to-alist message-body-args)
-                                    #:wants-reply #t
-                                    #:in-reply-to (message-id original-message))))
-    (abort-to-prompt agenda-prompt from-actor new-message)))
-
-
 \f
 ;;; 8sync bootstrap utilities
 ;;; =========================
@@ -561,22 +617,23 @@ an integer."
 
 
 \f
-;;; Convenience procedures
-;;; ======================
+;;; Basic readers / writers
+;;; =======================
 
 (define (serialize-message message)
   "Serialize a message for read/write"
   (list
    (message-id message)
-   (address->string (message-to message))
-   (address->string (message-from message))
+   (message-to message)
+   (message-from message)
    (message-action message)
    (message-body message)
    (message-in-reply-to message)
+   (message-wants-reply message)
    (message-replied message)
    (message-deferred-reply message)))
 
-(define (write-message message port)
+(define* (write-message message #:optional (port (current-output-port)))
   "Write out a message to a port for easy reading later.
 
 Note that if a sub-value can't be easily written to something
@@ -593,9 +650,28 @@ to improve that.  You'll need a better serializer for that.."
     (action ,(message-action message))
     (body ,(message-body message))
     (in-reply-to ,(message-in-reply-to message))
+    (wants-reply ,(message-wants-reply message))
     (replied ,(message-replied message))
     (deferred-reply ,(message-deferred-reply message))))
 
 (define (pprint-message message)
   "Pretty print a message."
   (pretty-print (serialize-message-pretty message)))
+
+(define* (read-message #:optional (port (current-input-port)))
+  "Read a message serialized via serialize-message from PORT"
+  (match (read port)
+    ((id to from action body in-reply-to wants-reply replied deferred-reply)
+     (make-message-intern
+      id to from action body
+      in-reply-to wants-reply replied deferred-reply))
+    (anything-else
+     (throw 'message-read-bad-structure
+            "Could not read message from structure"
+            anything-else))))
+
+(define (read-message-from-string message-str)
+  "Read message from MESSAGE-STR"
+  (with-input-from-string message-str
+    (lambda ()
+      (read-message (current-input-port)))))