X-Git-Url: https://jxself.org/git/?a=blobdiff_plain;ds=sidebyside;f=8sync%2Fsystems%2Factors.scm;h=f9b9047ac4a83fc1d06e82bb798422c0f7750dae;hb=148aae4afbbbfa1a0845746294acb1385deda9b6;hp=2d42897a03740c3be374473d192d9b463e289a71;hpb=55bf40bbd2d7cd5f0f14f0e2de28f71dfe1a127e;p=8sync.git
diff --git a/8sync/systems/actors.scm b/8sync/systems/actors.scm
index 2d42897..f9b9047 100644
--- a/8sync/systems/actors.scm
+++ b/8sync/systems/actors.scm
@@ -38,7 +38,9 @@
actor-hive
actor-message-handler
-
+ ;;; Commenting out the type for now;
+ ;;; it may be back when we have better serializers
+ ;;
make-address address?
address-actor-id address-hive-id
@@ -68,7 +70,11 @@
reply-message reply-message-wait
ez-run-hive
- hive-bootstrap-message))
+ hive-bootstrap-message
+
+ serialize-message write-message
+ serialize-message-pretty pprint-message
+ read-message read-message-from-string))
;; For ids
(define %random-state
@@ -103,12 +109,138 @@
slot-name)))
+
+;;; Messages
+;;; ========
+
+
+(define-record-type
+ (make-message-intern id to from action
+ body in-reply-to wants-reply ; do we need hive-proxy?
+ ;; Are these still needed?
+ replied deferred-reply)
+ message?
+ (id message-id)
+ (to message-to)
+ (from message-from)
+ (action message-action)
+ (body message-body)
+ (in-reply-to message-in-reply-to)
+ (wants-reply message-wants-reply)
+
+ ;; See XUDD source for these. Not use yet, maybe eventually will be?
+ ;; XUDD uses them for autoreply.
+ ;; Requiring mutation on message objects is clearly not great,
+ ;; but it may be worth it...? Investigate!
+ (replied message-replied set-message-replied!)
+ (deferred-reply message-deferred-reply set-message-deferred-reply!))
+
+
+(define* (make-message id to from action body
+ #:key in-reply-to wants-reply
+ replied deferred-reply)
+ (make-message-intern id to from action body
+ in-reply-to wants-reply replied
+ deferred-reply))
+
+;; Note: the body of messages is currently an alist, but it's created
+;; from a keyword based property list (see the following two functions).
+;; But, that's an extra conversion step, and maybe totally unnecessary:
+;; we already have message-ref, and this could just pull a keyword
+;; from a property list.
+;; The main ways this might be useful are error checking,
+;; serialization across the wire (but even that might require some
+;; change), and using existing tooling (though adding new tooling
+;; would be negligible in implementation effort.)
+
+;; This cons cell is immutable and unique (for eq? tests)
+(define %nothing-provided (cons 'nothing 'provided))
+
+(define* (message-ref message key #:optional (dflt %nothing-provided))
+ "Extract KEY from body of MESSAGE.
+
+Optionally set default with [DFLT]
+If key not found and DFLT not provided, throw an error."
+ (let ((result (assoc key (message-body message))))
+ (if result (cdr result)
+ (if (eq? dflt %nothing-provided)
+ (throw 'message-missing-key
+ "Message body does not contain key and no default provided"
+ #:key key
+ #:message message)
+ dflt))))
+
+
+(define (kwarg-list-to-alist args)
+ (let loop ((remaining args)
+ (result '()))
+ (match remaining
+ (((? keyword? key) val rest ...)
+ (loop rest
+ (cons (cons (keyword->symbol key) val)
+ result)))
+ (() result)
+ (_ (throw 'invalid-kwarg-list
+ "Invalid keyword argument list"
+ args)))))
+
+
+(define (send-message from-actor to-id action . message-body-args)
+ "Send a message from an actor to another actor"
+ (let* ((hive (actor-hive from-actor))
+ (message (make-message (hive-gen-message-id hive) to-id
+ (actor-id from-actor) action
+ (kwarg-list-to-alist message-body-args))))
+ (8sync (hive-process-message hive message))))
+
+(define (send-message-wait from-actor to-id action . message-body-args)
+ "Send a message from an actor to another, but wait until we get a response"
+ (let* ((hive (actor-hive from-actor))
+ (agenda-prompt (hive-prompt (actor-hive from-actor)))
+ (message (make-message (hive-gen-message-id hive) to-id
+ (actor-id from-actor) action
+ (kwarg-list-to-alist message-body-args)
+ #:wants-reply #t)))
+ (abort-to-prompt agenda-prompt from-actor message)))
+
+;; TODO: Intelligently ~propagate(ish) errors on -wait functions.
+;; We might have `send-message-wait-brazen' to allow callers to
+;; not have an exception thrown and instead just have a message with
+;; the appropriate '*error* message returned.
+
+(define (reply-message from-actor original-message
+ . message-body-args)
+ "Reply to a message"
+ (set-message-replied! original-message #t)
+ (let* ((hive (actor-hive from-actor))
+ (new-message (make-message (hive-gen-message-id hive)
+ (message-from original-message)
+ (actor-id from-actor) '*reply*
+ (kwarg-list-to-alist message-body-args)
+ #:in-reply-to (message-id original-message))))
+ (8sync (hive-process-message hive new-message))))
+
+(define (reply-message-wait from-actor original-message
+ . message-body-args)
+ "Reply to a messsage, but wait until we get a response"
+ (set-message-replied! original-message #t)
+ (let* ((hive (actor-hive from-actor))
+ (agenda-prompt (hive-prompt (actor-hive from-actor)))
+ (new-message (make-message (hive-gen-message-id hive)
+ (message-from original-message)
+ (actor-id from-actor) '*reply*
+ (kwarg-list-to-alist message-body-args)
+ #:wants-reply #t
+ #:in-reply-to (message-id original-message))))
+ (abort-to-prompt agenda-prompt from-actor new-message)))
+
+
;;; Main actor implementation
;;; =========================
(define-class ()
- ;; An object
+ ;; An address object
(id #:init-thunk (require-slot "id")
#:init-keyword #:id
#:getter actor-id)
@@ -124,17 +256,31 @@
(define-method (actor-message-handler (actor ))
(slot-ref actor 'message-handler))
-(define-record-type
- (make-address actor-id hive-id) ; @@: Do we want the trailing -id?
- address?
- (actor-id address-actor-id)
- (hive-id address-hive-id))
+;;; So these are the nicer representations of addresses.
+;;; However, they don't serialize so easily with scheme read/write, so we're
+;;; using the simpler cons cell version below for now.
+
+;; (define-record-type
+;; (make-address actor-id hive-id) ; @@: Do we want the trailing -id?
+;; address?
+;; (actor-id address-actor-id)
+;; (hive-id address-hive-id))
+;;
+;; (set-record-type-printer!
+;;
+;; (lambda (record port)
+;; (format port ""
+;; (address-actor-id record) (address-hive-id record))))
+;;
+
+(define (make-address actor-id hive-id)
+ (cons actor-id hive-id))
-(set-record-type-printer!
-
- (lambda (record port)
- (format port ""
- (address-actor-id record) (address-hive-id record))))
+(define (address-actor-id address)
+ (car address))
+
+(define (address-hive-id address)
+ (cdr address))
(define (address->string address)
(string-append (address-actor-id address) "@"
@@ -203,7 +349,7 @@ more compact following syntax:
(simple-dispatcher
(list (%expand-action-item action-item) ...)))))
-(define-syntax-rule (define-simple-actor class (actions ...))
+(define-syntax-rule (define-simple-actor class actions ...)
(define-class class ()
(message-handler
#:init-value (make-action-dispatch actions ...)
@@ -318,7 +464,14 @@ more compact following syntax:
(define (resume-waiting-coroutine)
(match (hash-remove! (hive-waiting-coroutines hive)
(message-in-reply-to message))
- ((_ . kont)
+ ((_ . (resume-actor-id . kont))
+ (if (not (equal? (message-to message)
+ resume-actor-id))
+ (throw 'resuming-to-wrong-actor
+ "Attempted to resume a coroutine to the wrong actor!"
+ #:expected-actor-id (message-to message)
+ #:got-actor-id resume-actor-id
+ #:message message))
(kont message))
(#f (throw 'no-waiting-coroutine
"message in-reply-to tries to resume nonexistent coroutine"
@@ -401,123 +554,6 @@ Instead, actors should call create-actor."
(hive #:init-keyword #:hive))
-
-;;; Messages
-;;; ========
-
-
-(define-record-type
- (make-message-intern id to from action
- body in-reply-to wants-reply ; do we need hive-proxy?
- ;; Are these still needed?
- replied deferred-reply)
- message?
- (id message-id)
- (to message-to)
- (from message-from)
- (action message-action)
- (body message-body)
- (in-reply-to message-in-reply-to)
- (wants-reply message-wants-reply)
-
- ;; See XUDD source for these. Not use yet, maybe eventually will be?
- ;; XUDD uses them for autoreply.
- ;; Requiring mutation on message objects is clearly not great,
- ;; but it may be worth it...? Investigate!
- (replied message-replied set-message-replied!)
- (deferred-reply message-deferred-reply set-message-deferred-reply!))
-
-
-(define* (make-message id to from action body
- #:key in-reply-to wants-reply
- replied deferred-reply)
- (make-message-intern id to from action body
- in-reply-to wants-reply replied
- deferred-reply))
-
-;; Note: the body of messages is currently an alist, but it's created
-;; from a keyword based property list (see the following two functions).
-;; But, that's an extra conversion step, and maybe totally unnecessary:
-;; we already have message-ref, and this could just pull a keyword
-;; from a property list.
-;; The main ways this might be useful are error checking,
-;; serialization across the wire (but even that might require some
-;; change), and using existing tooling (though adding new tooling
-;; would be negligible in implementation effort.)
-
-(define* (message-ref message key #:optional dflt)
- "Extract KEY from body of MESSAGE.
-
-Optionally set default with [DFLT]"
- (let ((result (assoc key (message-body message))))
- (if result (cdr result)
- dflt)))
-
-
-(define (kwarg-list-to-alist args)
- (let loop ((remaining args)
- (result '()))
- (match remaining
- (((? keyword? key) val rest ...)
- (loop rest
- (cons (cons (keyword->symbol key) val)
- result)))
- (() result)
- (_ (throw 'invalid-kwarg-list
- "Invalid keyword argument list"
- args)))))
-
-
-(define (send-message from-actor to-id action . message-body-args)
- "Send a message from an actor to another actor"
- (let* ((hive (actor-hive from-actor))
- (message (make-message (hive-gen-message-id hive) to-id
- (actor-id from-actor) action
- (kwarg-list-to-alist message-body-args))))
- (8sync (hive-process-message hive message))))
-
-(define (send-message-wait from-actor to-id action . message-body-args)
- "Send a message from an actor to another, but wait until we get a response"
- (let* ((hive (actor-hive from-actor))
- (agenda-prompt (hive-prompt (actor-hive from-actor)))
- (message (make-message (hive-gen-message-id hive) to-id
- (actor-id from-actor) action
- (kwarg-list-to-alist message-body-args)
- #:wants-reply #t)))
- (abort-to-prompt agenda-prompt from-actor message)))
-
-;; TODO: Intelligently ~propagate(ish) errors on -wait functions.
-;; We might have `send-message-wait-brazen' to allow callers to
-;; not have an exception thrown and instead just have a message with
-;; the appropriate '*error* message returned.
-
-(define (reply-message from-actor original-message
- . message-body-args)
- "Reply to a message"
- (set-message-replied! original-message #t)
- (let* ((hive (actor-hive from-actor))
- (new-message (make-message (hive-gen-message-id hive)
- (message-from original-message)
- (actor-id from-actor) '*reply*
- (kwarg-list-to-alist message-body-args)
- #:in-reply-to (message-id original-message))))
- (8sync (hive-process-message hive new-message))))
-
-(define (reply-message-wait from-actor original-message
- . message-body-args)
- "Reply to a messsage, but wait until we get a response"
- (set-message-replied! original-message #t)
- (let* ((hive (actor-hive from-actor))
- (agenda-prompt (hive-prompt (actor-hive from-actor)))
- (new-message (make-message (hive-gen-message-id hive)
- (message-from original-message)
- (actor-id from-actor) '*reply*
- (kwarg-list-to-alist message-body-args)
- #:wants-reply #t
- #:in-reply-to (message-id original-message))))
- (abort-to-prompt agenda-prompt from-actor new-message)))
-
-
;;; 8sync bootstrap utilities
;;; =========================
@@ -545,22 +581,23 @@ an integer."
-;;; Convenience procedures
-;;; ======================
+;;; Basic readers / writers
+;;; =======================
(define (serialize-message message)
"Serialize a message for read/write"
(list
(message-id message)
- (address->string (message-to message))
- (address->string (message-from message))
+ (message-to message)
+ (message-from message)
(message-action message)
(message-body message)
(message-in-reply-to message)
+ (message-wants-reply message)
(message-replied message)
(message-deferred-reply message)))
-(define (write-message message port)
+(define* (write-message message #:optional (port (current-output-port)))
"Write out a message to a port for easy reading later.
Note that if a sub-value can't be easily written to something
@@ -577,9 +614,28 @@ to improve that. You'll need a better serializer for that.."
(action ,(message-action message))
(body ,(message-body message))
(in-reply-to ,(message-in-reply-to message))
+ (wants-reply ,(message-wants-reply message))
(replied ,(message-replied message))
(deferred-reply ,(message-deferred-reply message))))
(define (pprint-message message)
"Pretty print a message."
(pretty-print (serialize-message-pretty message)))
+
+(define* (read-message #:optional (port (current-input-port)))
+ "Read a message serialized via serialize-message from PORT"
+ (match (read port)
+ ((id to from action body in-reply-to wants-reply replied deferred-reply)
+ (make-message-intern
+ id to from action body
+ in-reply-to wants-reply replied deferred-reply))
+ (anything-else
+ (throw 'message-read-bad-structure
+ "Could not read message from structure"
+ anything-else))))
+
+(define (read-message-from-string message-str)
+ "Read message from MESSAGE-STR"
+ (with-input-from-string message-str
+ (lambda ()
+ (read-message (current-input-port)))))