#:use-module (srfi srfi-9 gnu)
#:use-module (ice-9 format)
#:use-module (ice-9 match)
+ #:use-module (ice-9 pretty-print)
#:use-module (8sync agenda)
#:use-module (8sync repl)
#:export (;; utilities... ought to go in their own module
big-random-number
big-random-number-string
simple-message-id-generator
- require-slot
<actor>
actor-id
- actor-hive
actor-message-handler
- <address>
+ ;;; Commenting out the <address> type for now;
+ ;;; it may be back when we have better serializers
+ ;; <address>
make-address address?
address-actor-id address-hive-id
actor-id-hive
actor-id-string
+ mlambda define-mhandler
make-action-dispatch
define-simple-actor
hive-id
hive-create-actor hive-create-actor*
+ create-actor create-actor*
+ self-destruct
+
<message>
make-message message?
message-to message-action message-from
send-message send-message-wait
reply-message reply-message-wait
+ <- <-wait <-reply <-reply-wait
+
ez-run-hive
- hive-bootstrap-message))
+ bootstrap-message
+
+ serialize-message write-message
+ serialize-message-pretty pprint-message
+ read-message read-message-from-string))
;; For ids
(define %random-state
(make-parameter (random-state-from-platform)))
-;; Probably bigger than necessary
-(define random-number-size (expt 10 50))
+;; Same size as a uuid4 I think...
+(define random-number-size (expt 2 128))
(define (big-random-number)
(random random-number-size (%random-state)))
(set! counter (1+ counter))
(string-append prefix (number->string counter)))))
-(define (require-slot slot-name)
- "Generate something for #:init-thunk to complain about unfilled slot"
- (lambda ()
- (throw 'required-slot
- (format #f "Slot ~s not filled" slot-name)
- slot-name)))
+
+\f
+;;; Messages
+;;; ========
+
+
+(define-record-type <message>
+ (make-message-intern id to from action
+ body in-reply-to wants-reply
+ replied
+ ;; @@: Not used yet.
+ ;; Will we ever find a real use case?
+ deferred-reply)
+ message?
+ (id message-id)
+ (to message-to)
+ (from message-from)
+ (action message-action)
+ (body message-body)
+ (in-reply-to message-in-reply-to)
+ (wants-reply message-wants-reply)
+
+ ;; See XUDD source for these. Not use yet, maybe eventually will be?
+ ;; XUDD uses them for autoreply.
+ ;; Requiring mutation on message objects is clearly not great,
+ ;; but it may be worth it...? Investigate!
+ (replied message-replied set-message-replied!)
+ (deferred-reply message-deferred-reply set-message-deferred-reply!))
+
+
+(define* (make-message id to from action body
+ #:key in-reply-to wants-reply
+ replied deferred-reply)
+ (make-message-intern id to from action body
+ in-reply-to wants-reply replied
+ deferred-reply))
+
+;; Note: the body of messages is currently an alist, but it's created
+;; from a keyword based property list (see the following two functions).
+;; But, that's an extra conversion step, and maybe totally unnecessary:
+;; we already have message-ref, and this could just pull a keyword
+;; from a property list.
+;; The main ways this might be useful are error checking,
+;; serialization across the wire (but even that might require some
+;; change), and using existing tooling (though adding new tooling
+;; would be negligible in implementation effort.)
+
+;; This cons cell is immutable and unique (for eq? tests)
+(define %nothing-provided (cons 'nothing 'provided))
+
+(define* (message-ref message key #:optional (dflt %nothing-provided))
+ "Extract KEY from body of MESSAGE.
+
+Optionally set default with [DFLT]
+If key not found and DFLT not provided, throw an error."
+ (let ((result (assoc key (message-body message))))
+ (if result (cdr result)
+ (if (eq? dflt %nothing-provided)
+ (throw 'message-missing-key
+ "Message body does not contain key and no default provided"
+ #:key key
+ #:message message)
+ dflt))))
+
+
+(define (message-needs-reply message)
+ "See if this message needs a reply still"
+ (and (message-wants-reply message)
+ (not (or (message-replied message)
+ (message-deferred-reply message)))))
+
+
+(define (kwarg-list-to-alist args)
+ (let loop ((remaining args)
+ (result '()))
+ (match remaining
+ (((? keyword? key) val rest ...)
+ (loop rest
+ (cons (cons (keyword->symbol key) val)
+ result)))
+ (() result)
+ (_ (throw 'invalid-kwarg-list
+ "Invalid keyword argument list"
+ args)))))
+
+
+(define (send-message from-actor to-id action . message-body-args)
+ "Send a message from an actor to another actor"
+ (let* ((hive (actor-hive from-actor))
+ (message (make-message (hive-gen-message-id hive) to-id
+ (actor-id from-actor) action
+ (kwarg-list-to-alist message-body-args))))
+ (8sync-nowait (hive-process-message hive message))))
+
+(define (send-message-wait from-actor to-id action . message-body-args)
+ "Send a message from an actor to another, but wait until we get a response"
+ (let* ((hive (actor-hive from-actor))
+ (abort-to (hive-prompt (actor-hive from-actor)))
+ (message (make-message (hive-gen-message-id hive) to-id
+ (actor-id from-actor) action
+ (kwarg-list-to-alist message-body-args)
+ #:wants-reply #t)))
+ (abort-to-prompt abort-to from-actor message)))
+
+;; TODO: Intelligently ~propagate(ish) errors on -wait functions.
+;; We might have `send-message-wait-brazen' to allow callers to
+;; not have an exception thrown and instead just have a message with
+;; the appropriate '*error* message returned.
+
+(define (reply-message from-actor original-message
+ . message-body-args)
+ "Reply to a message"
+ (set-message-replied! original-message #t)
+ (let* ((hive (actor-hive from-actor))
+ (new-message (make-message (hive-gen-message-id hive)
+ (message-from original-message)
+ (actor-id from-actor) '*reply*
+ (kwarg-list-to-alist message-body-args)
+ #:in-reply-to (message-id original-message))))
+ (8sync-nowait (hive-process-message hive new-message))))
+
+(define (reply-message-wait from-actor original-message
+ . message-body-args)
+ "Reply to a messsage, but wait until we get a response"
+ (set-message-replied! original-message #t)
+ (let* ((hive (actor-hive from-actor))
+ (abort-to (hive-prompt (actor-hive from-actor)))
+ (new-message (make-message (hive-gen-message-id hive)
+ (message-from original-message)
+ (actor-id from-actor) '*reply*
+ (kwarg-list-to-alist message-body-args)
+ #:wants-reply #t
+ #:in-reply-to (message-id original-message))))
+ (abort-to-prompt abort-to from-actor new-message)))
+
+
+;;; Aliases!
+;;; See: http://mumble.net/~jar/articles/oo-moon-weinreb.html
+;;; (also worth seeing: http://mumble.net/~jar/articles/oo.html )
+
+(define <- send-message)
+(define <-wait send-message-wait)
+(define <-reply reply-message)
+(define <-reply-wait reply-message-wait)
\f
;;; =========================
(define-class <actor> ()
- ;; An <address> object
- (id #:init-thunk (require-slot "id")
- #:init-keyword #:id
+ ;; An address object
+ (id #:init-keyword #:id
#:getter actor-id)
;; The hive we're connected to.
;; We need this to be able to send messages.
- (hive #:init-thunk (require-slot "hive")
- #:init-keyword #:hive
+ (hive #:init-keyword #:hive
#:accessor actor-hive)
;; How we receive and process new messages
- (message-handler #:init-thunk (require-slot "message-handler")
- #:allocation #:each-subclass))
+ (message-handler #:allocation #:each-subclass))
(define-method (actor-message-handler (actor <actor>))
(slot-ref actor 'message-handler))
-(define-record-type <address>
- (make-address actor-id hive-id) ; @@: Do we want the trailing -id?
- address?
- (actor-id address-actor-id)
- (hive-id address-hive-id))
+;;; So these are the nicer representations of addresses.
+;;; However, they don't serialize so easily with scheme read/write, so we're
+;;; using the simpler cons cell version below for now.
+;; (define-record-type <address>
+;; (make-address actor-id hive-id) ; @@: Do we want the trailing -id?
+;; address?
+;; (actor-id address-actor-id)
+;; (hive-id address-hive-id))
+;;
;; (set-record-type-printer!
;; <address>
;; (lambda (record port)
-;; (format port "<<address> ~s>" (address->string record))))
+;; (format port "<address: ~s@~s>"
+;; (address-actor-id record) (address-hive-id record))))
+;;
+
+(define (make-address actor-id hive-id)
+ (cons actor-id hive-id))
+
+(define (address-actor-id address)
+ (car address))
+
+(define (address-hive-id address)
+ (cdr address))
(define (address->string address)
(string-append (address-actor-id address) "@"
;;; Actor utilities
;;; ===============
+
+(define-syntax-rule (with-message-args (message message-arg ...)
+ body body* ...)
+ (let ((message-arg (message-ref message (quote message-arg))) ...)
+ body body* ...))
+
+(define-syntax mlambda
+ (syntax-rules ()
+ "A lambda for building message handlers.
+
+Use it like:
+ (mlambda (actor message foo)
+ ...)
+
+Which is like doing manually:
+ (lambda (actor message)
+ (let ((foo (message-ref message foo)))
+ ...))"
+ ((_ (actor message message-arg ...)
+ body body* ...)
+ (lambda (actor message)
+ (with-message-args (message message-arg ...) body body* ...)))))
+
+;; @@: Sadly, docstrings won't work with this...
+;; I think we need to bust out syntax-case to make that happen...
+(define-syntax-rule (define-mhandler (name actor message message-arg ...)
+ body ...)
+ (define name
+ (mlambda (actor message message-arg ...)
+ body ...)))
+
(define (simple-dispatcher action-map)
(lambda (actor message)
(let* ((action (message-action message))
(syntax-rules ()
((_ ((action-name action-args ...) body ...))
(cons (quote action-name)
- (lambda (action-args ...)
+ (mlambda (action-args ...)
body ...)))
((_ (action-name handler))
(cons (quote action-name) handler))))
(simple-dispatcher
(list (%expand-action-item action-item) ...)))))
-(define-syntax-rule (define-simple-actor class (actions ...))
+(define-syntax-rule (define-simple-actor class actions ...)
(define-class class (<actor>)
(message-handler
#:init-value (make-action-dispatch actions ...)
(define-generic hive-handle-failed-forward)
(define-class <hive> (<actor>)
- ;; This gets set to itself immediately after being created
- (hive #:init-value #f)
(actor-registry #:init-thunk make-hash-table
#:getter hive-actor-registry)
(msg-id-generator #:init-thunk simple-message-id-generator
;; This is a map from cons cell of message-id
;; to a cons cell of (actor-id . coroutine)
;; @@: Should we have a <waiting-coroutine> record type?
+ ;; @@: Should there be any way to clear out "old" coroutines?
(waiting-coroutines #:init-thunk make-hash-table
#:getter hive-waiting-coroutines)
;; Message prompt
'*forward*
`((original . ,message))))
+(define-method (hive-reply-with-error (hive <hive>) original-message
+ error-key error-args)
+ ;; We only supply the error-args if the original sender is on the same hive
+ (define (orig-actor-on-same-hive?)
+ (equal? (hive-id hive)
+ (address-hive-id (message-from original-message))))
+ (set-message-replied! original-message #t)
+ (let* ((new-message-body
+ (if (orig-actor-on-same-hive?)
+ `((original-message . ,original-message)
+ (error-key . ,error-key)
+ (error-args . ,error-args))
+ `((original-message . ,original-message)
+ (error-key . ,error-key))))
+ (new-message (make-message (hive-gen-message-id hive)
+ (message-from original-message)
+ (actor-id hive) '*error*
+ new-message-body
+ #:in-reply-to (message-id original-message))))
+ (8sync-nowait (hive-process-message hive new-message))))
+
(define-method (hive-process-message (hive <hive>) message)
"Handle one message, or forward it via an ambassador"
- (define (process-local-message)
+ (define (maybe-autoreply actor)
+ ;; Possibly autoreply
+ (if (message-needs-reply message)
+ ;; @@: Should we give *autoreply* as the action instead of *reply*?
+ (reply-message actor message
+ #:*auto-reply* #t)))
+
+ (define (resolve-actor-to)
+ "Get the actor the message was aimed at"
(let ((actor (hive-resolve-local-actor hive (message-to message))))
(if (not actor)
(throw 'actor-not-found
(address->string (message-from message))
(address->string (message-to message)))
message))
- (call-with-prompt (hive-prompt hive)
- (lambda ()
- (define message-handler (actor-message-handler actor))
- ;; @@: Should a more general error handling happen here?
- (message-handler actor message))
-
- (lambda (kont actor message)
- (let ((hive (actor-hive actor)))
- ;; Register the coroutine
- (hash-set! (hive-waiting-coroutines hive)
- (message-id message)
- (cons (actor-id actor) kont))
- ;; Send off the message
- (8sync (hive-process-message hive message)))))))
+ actor))
+
+ (define (call-catching-coroutine thunk)
+ (define (call-catching-errors)
+ (with-throw-handler
+ #t thunk
+ (lambda (key . args)
+ (if (message-needs-reply message)
+ ;; If the message is waiting on a reply, let them know
+ ;; something went wrong.
+ (hive-reply-with-error hive message key args)))))
+ (call-with-prompt (hive-prompt hive)
+ call-catching-errors
+ (lambda (kont actor message)
+ ;; Register the coroutine
+ (hash-set! (hive-waiting-coroutines hive)
+ (message-id message)
+ (cons (actor-id actor) kont))
+ ;; Send off the message
+ (8sync (hive-process-message hive message)))))
+
+ (define (process-local-message)
+ (let ((actor (resolve-actor-to)))
+ (call-catching-coroutine
+ (lambda ()
+ (define message-handler (actor-message-handler actor))
+ ;; @@: Should a more general error handling happen here?
+ (let ((result
+ (message-handler actor message)))
+ (maybe-autoreply actor)
+ ;; Returning result allows actors to possibly make a run-request
+ ;; at the end of handling a message.
+ ;; ... We do want that, right?
+ result)))))
(define (resume-waiting-coroutine)
- (match (hash-remove! (hive-waiting-coroutines hive)
- (message-in-reply-to message))
- ((_ . kont)
- (kont message))
- (#f (throw 'no-waiting-coroutine
- "message in-reply-to tries to resume nonexistent coroutine"
- message))))
+ (cond
+ ((eq? (message-action message) '*reply*)
+ (call-catching-coroutine
+ (lambda ()
+ (match (hash-remove! (hive-waiting-coroutines hive)
+ (message-in-reply-to message))
+ ((_ . (resume-actor-id . kont))
+ (if (not (equal? (message-to message)
+ resume-actor-id))
+ (throw 'resuming-to-wrong-actor
+ "Attempted to resume a coroutine to the wrong actor!"
+ #:expected-actor-id (message-to message)
+ #:got-actor-id resume-actor-id
+ #:message message))
+ (let (;; @@: How should we resolve resuming coroutines to actors who are
+ ;; now gone?
+ (actor (resolve-actor-to))
+ (result (kont message)))
+ (maybe-autoreply actor)
+ result))
+ (#f (throw 'no-waiting-coroutine
+ "message in-reply-to tries to resume nonexistent coroutine"
+ message))))))
+ ;; Yikes, we must have gotten an error or something back
+ (else
+ ;; @@: Not what we want in the long run?
+ ;; What we'd *prefer* to do is to resume this message
+ ;; and throw an error inside the message handler
+ ;; (say, from send-mesage-wait), but that causes a SIGABRT (??!!)
+ (hash-remove! (hive-waiting-coroutines hive)
+ (message-in-reply-to message))
+ (let ((explaination
+ (if (eq? (message-action message) '*reply*)
+ "Won't resume coroutine; got an *error* as a reply"
+ "Won't resume coroutine because action is not *reply*")))
+ (throw 'hive-unresumable-coroutine
+ explaination
+ #:message message)))))
(define (process-remote-message)
;; Find the ambassador
that method for documentation."
(let* ((actor-id (hive-gen-actor-id hive id-cookie))
(actor (apply make actor-class
- ;; @@: If we switch to a hive-proxy, do it here
#:hive hive
#:id actor-id
init)))
;; return the actor id
actor-id))
-(define* (hive-create-actor hive actor-class
- #:key
- (init '())
- id-cookie)
+(define* (hive-create-actor hive actor-class #:rest init)
(%hive-create-actor hive actor-class
- init id-cookie))
-
-(define-syntax hive-create-actor*
- (syntax-rules ()
- "Create an instance of actor-class attached to this hive.
-Return the new actor's id.
+ init #f))
-Used internally, and used for bootstrapping a fresh hive.
-
-Note that actors should generally not call this method directly.
-Instead, actors should call create-actor."
- ((_ args ... (init-args ...))
- (hive-create-actor args ...
- #:init (list init-args ...)))))
-
-
-;; TODO: Give actors this instead of the actual hive reference
-(define-class <hive-proxy> ()
- (send-message #:getter proxy-send-message
- #:init-keyword #:send-message)
- (create-actor #:getter proxy-create-actor
- #:init-keyword #:create-actor))
-
-;; Live the hive proxy, but has access to the hive itself...
-(define-class <debug-hive-proxy> (<hive-proxy>)
- (hive #:init-keyword #:hive))
+(define* (hive-create-actor* hive actor-class id-cookie #:rest init)
+ (%hive-create-actor hive actor-class
+ init id-cookie))
\f
-;;; Messages
-;;; ========
-
-
-(define-record-type <message>
- (make-message-intern id to from action
- body in-reply-to wants-reply ; do we need hive-proxy?
- ;; Are these still needed?
- replied deferred-reply)
- message?
- (id message-id)
- (to message-to)
- (from message-from)
- (action message-action)
- (body message-body)
- (in-reply-to message-in-reply-to)
- (wants-reply message-wants-reply)
-
- ;; See XUDD source for these. Not use yet, maybe eventually will be?
- ;; XUDD uses them for autoreply.
- ;; Requiring mutation on message objects is clearly not great,
- ;; but it may be worth it...? Investigate!
- (replied message-replied set-message-replied!)
- (deferred-reply message-deferred-reply set-message-deferred-reply!))
-
-
-(define* (make-message id to from action body
- #:key in-reply-to wants-reply
- replied deferred-reply)
- (make-message-intern id to from action body
- in-reply-to wants-reply replied
- deferred-reply))
-
-;; Note: the body of messages is currently an alist, but it's created
-;; from a keyword based property list (see the following two functions).
-;; But, that's an extra conversion step, and maybe totally unnecessary:
-;; we already have message-ref, and this could just pull a keyword
-;; from a property list.
-;; The main ways this might be useful are error checking,
-;; serialization across the wire (but even that might require some
-;; change), and using existing tooling (though adding new tooling
-;; would be negligible in implementation effort.)
-
-(define* (message-ref message key #:optional dflt)
- "Extract KEY from body of MESSAGE.
-
-Optionally set default with [DFLT]"
- (let ((result (assoc key (message-body message))))
- (if result (cdr result)
- dflt)))
+;;; Various API methods for actors to interact with the system
+;;; ==========================================================
+;; TODO: move send-message and friends here...?
-(define (kwarg-list-to-alist args)
- (let loop ((remaining args)
- (result '()))
- (match remaining
- (((? keyword? key) val rest ...)
- (loop rest
- (cons (cons (keyword->symbol key) val)
- result)))
- (() result)
- (_ (throw 'invalid-kwarg-list
- "Invalid keyword argument list"
- args)))))
+(define* (create-actor from-actor actor-class #:rest init)
+ "Create an instance of actor-class. Return the new actor's id.
+This is the method actors should call directly (unless they want
+to supply an id-cookie, in which case they should use
+create-actor*)."
+ (8sync (%hive-create-actor (actor-hive from-actor) actor-class
+ init #f)))
-(define (send-message from-actor to-id action . message-body-args)
- "Send a message from an actor to another actor"
- (let* ((hive (actor-hive from-actor))
- (message (make-message (hive-gen-message-id hive) to-id
- (actor-id from-actor) action
- (kwarg-list-to-alist message-body-args))))
- (8sync (hive-process-message hive message))))
-(define (send-message-wait from-actor to-id action . message-body-args)
- "Send a message from an actor to another, but wait until we get a response"
- (let* ((hive (actor-hive from-actor))
- (agenda-prompt (hive-prompt (actor-hive from-actor)))
- (message (make-message (hive-gen-message-id hive) to-id
- (actor-id from-actor) action
- (kwarg-list-to-alist message-body-args)
- #:wants-reply #t)))
- (abort-to-prompt agenda-prompt from-actor message)))
+(define* (create-actor* from-actor actor-class id-cookie #:rest init)
+ "Create an instance of actor-class. Return the new actor's id.
-;; TODO: Intelligently ~propagate(ish) errors on -wait functions.
-;; We might have `send-message-wait-brazen' to allow callers to
-;; not have an exception thrown and instead just have a message with
-;; the appropriate '*error* message returned.
+Like create-actor, but permits supplying an id-cookie."
+ (8sync (%hive-create-actor (actor-hive from-actor) actor-class
+ init id-cookie)))
-(define (reply-message from-actor original-message
- . message-body-args)
- "Reply to a message"
- (set-message-replied! original-message #t)
- (let* ((hive (actor-hive from-actor))
- (new-message (make-message (hive-gen-message-id hive)
- (message-from original-message)
- (actor-id from-actor) '*reply*
- (kwarg-list-to-alist message-body-args)
- #:in-reply-to (message-id original-message))))
- (8sync (hive-process-message hive new-message))))
-(define (reply-message-wait from-actor original-message
- . message-body-args)
- "Reply to a messsage, but wait until we get a response"
- (set-message-replied! original-message #t)
- (let* ((hive (actor-hive from-actor))
- (agenda-prompt (hive-prompt (actor-hive from-actor)))
- (new-message (make-message (hive-gen-message-id hive)
- (message-from original-message)
- (actor-id from-actor) '*reply*
- (kwarg-list-to-alist message-body-args)
- #:wants-reply #t
- #:in-reply-to (message-id original-message))))
- (abort-to-prompt agenda-prompt from-actor new-message)))
+(define (self-destruct actor)
+ "Remove an actor from the hive."
+ (hash-remove! (hive-actor-registry (actor-hive actor))
+ (actor-id actor)))
\f
(spawn-and-queue-repl-server! agenda)))
(start-agenda agenda)))
-(define (hive-bootstrap-message hive to-id action . message-body-args)
+(define (bootstrap-message hive to-id action . message-body-args)
(wrap
(apply send-message hive to-id action message-body-args)))
+
+
+\f
+;;; Basic readers / writers
+;;; =======================
+
+(define (serialize-message message)
+ "Serialize a message for read/write"
+ (list
+ (message-id message)
+ (message-to message)
+ (message-from message)
+ (message-action message)
+ (message-body message)
+ (message-in-reply-to message)
+ (message-wants-reply message)
+ (message-replied message)
+ (message-deferred-reply message)))
+
+(define* (write-message message #:optional (port (current-output-port)))
+ "Write out a message to a port for easy reading later.
+
+Note that if a sub-value can't be easily written to something
+Guile's `read' procedure knows how to read, this doesn't do anything
+to improve that. You'll need a better serializer for that.."
+ (write (serialize-message message) port))
+
+(define (serialize-message-pretty message)
+ "Serialize a message in a way that's easy for humans to read."
+ `(*message*
+ (id ,(message-id message))
+ (to ,(message-to message))
+ (from ,(message-from message))
+ (action ,(message-action message))
+ (body ,(message-body message))
+ (in-reply-to ,(message-in-reply-to message))
+ (wants-reply ,(message-wants-reply message))
+ (replied ,(message-replied message))
+ (deferred-reply ,(message-deferred-reply message))))
+
+(define (pprint-message message)
+ "Pretty print a message."
+ (pretty-print (serialize-message-pretty message)))
+
+(define* (read-message #:optional (port (current-input-port)))
+ "Read a message serialized via serialize-message from PORT"
+ (match (read port)
+ ((id to from action body in-reply-to wants-reply replied deferred-reply)
+ (make-message-intern
+ id to from action body
+ in-reply-to wants-reply replied deferred-reply))
+ (anything-else
+ (throw 'message-read-bad-structure
+ "Could not read message from structure"
+ anything-else))))
+
+(define (read-message-from-string message-str)
+ "Read message from MESSAGE-STR"
+ (with-input-from-string message-str
+ (lambda ()
+ (read-message (current-input-port)))))