big-random-number
big-random-number-string
simple-message-id-generator
- require-slot
<actor>
actor-id
actor-id-hive
actor-id-string
+ mlambda
make-action-dispatch
define-simple-actor
send-message send-message-wait
reply-message reply-message-wait
+ <- <-wait <-reply <-reply-wait
+
ez-run-hive
bootstrap-message
(set! counter (1+ counter))
(string-append prefix (number->string counter)))))
-(define (require-slot slot-name)
- "Generate something for #:init-thunk to complain about unfilled slot"
- (lambda ()
- (throw 'required-slot
- (format #f "Slot ~s not filled" slot-name)
- slot-name)))
-
\f
;;; Messages
(abort-to-prompt abort-to from-actor new-message)))
+;;; Aliases!
+;;; See: http://mumble.net/~jar/articles/oo-moon-weinreb.html
+;;; (also worth seeing: http://mumble.net/~jar/articles/oo.html )
+
+(define <- send-message)
+(define <-wait send-message-wait)
+(define <-reply reply-message)
+(define <-reply-wait reply-message-wait)
+
+
\f
;;; Main actor implementation
;;; =========================
(define-class <actor> ()
;; An address object
- (id #:init-thunk (require-slot "id")
- #:init-keyword #:id
+ (id #:init-keyword #:id
#:getter actor-id)
;; The hive we're connected to.
;; We need this to be able to send messages.
- (hive #:init-thunk (require-slot "hive")
- #:init-keyword #:hive
+ (hive #:init-keyword #:hive
#:accessor actor-hive)
;; How we receive and process new messages
- (message-handler #:init-thunk (require-slot "message-handler")
- #:allocation #:each-subclass))
+ (message-handler #:allocation #:each-subclass))
(define-method (actor-message-handler (actor <actor>))
(slot-ref actor 'message-handler))
;;; Actor utilities
;;; ===============
+(define-syntax mlambda
+ (syntax-rules ()
+ "A lambda for building message handlers.
+
+Use it like:
+ (mlambda (actor message foo)
+ ...)
+
+Which is like doing manually:
+ (lambda (actor message)
+ (let ((foo (message-ref message foo)))
+ ...))"
+ ((_ (actor message message-arg ...)
+ body body* ...)
+ (lambda (actor message)
+ (let ((message-arg (message-ref message (quote message-arg))) ...)
+ body body* ...)))))
+
(define (simple-dispatcher action-map)
(lambda (actor message)
(let* ((action (message-action message))
(syntax-rules ()
((_ ((action-name action-args ...) body ...))
(cons (quote action-name)
- (lambda (action-args ...)
+ (mlambda (action-args ...)
body ...)))
((_ (action-name handler))
(cons (quote action-name) handler))))
(define-generic hive-handle-failed-forward)
(define-class <hive> (<actor>)
- ;; This gets set to itself immediately after being created
- (hive #:init-value #f)
(actor-registry #:init-thunk make-hash-table
#:getter hive-actor-registry)
(msg-id-generator #:init-thunk simple-message-id-generator
'*forward*
`((original . ,message))))
+(define-method (hive-reply-with-error (hive <hive>) original-message
+ error-key error-args)
+ ;; We only supply the error-args if the original sender is on the same hive
+ (define (orig-actor-on-same-hive?)
+ (equal? (hive-id hive)
+ (address-hive-id (message-from original-message))))
+ (set-message-replied! original-message #t)
+ (let* ((new-message-body
+ (if (orig-actor-on-same-hive?)
+ `((original-message . ,original-message)
+ (error-key . ,error-key)
+ (error-args . ,error-args))
+ `((original-message . ,original-message)
+ (error-key . ,error-key))))
+ (new-message (make-message (hive-gen-message-id hive)
+ (message-from original-message)
+ (actor-id hive) '*error*
+ new-message-body
+ #:in-reply-to (message-id original-message))))
+ (8sync-nowait (hive-process-message hive new-message))))
+
(define-method (hive-process-message (hive <hive>) message)
"Handle one message, or forward it via an ambassador"
(define (maybe-autoreply actor)
actor))
(define (call-catching-coroutine thunk)
+ (define (call-catching-errors)
+ (with-throw-handler
+ #t thunk
+ (lambda (key . args)
+ (if (message-needs-reply message)
+ ;; If the message is waiting on a reply, let them know
+ ;; something went wrong.
+ (hive-reply-with-error hive message key args)))))
(call-with-prompt (hive-prompt hive)
- thunk
+ call-catching-errors
(lambda (kont actor message)
- (let ((hive (actor-hive actor)))
- ;; Register the coroutine
- (hash-set! (hive-waiting-coroutines hive)
- (message-id message)
- (cons (actor-id actor) kont))
- ;; Send off the message
- (8sync (hive-process-message hive message))))))
+ ;; Register the coroutine
+ (hash-set! (hive-waiting-coroutines hive)
+ (message-id message)
+ (cons (actor-id actor) kont))
+ ;; Send off the message
+ (8sync (hive-process-message hive message)))))
(define (process-local-message)
(let ((actor (resolve-actor-to)))
result)))))
(define (resume-waiting-coroutine)
- (call-catching-coroutine
- (lambda ()
- (match (hash-remove! (hive-waiting-coroutines hive)
- (message-in-reply-to message))
- ((_ . (resume-actor-id . kont))
- (if (not (equal? (message-to message)
- resume-actor-id))
- (throw 'resuming-to-wrong-actor
- "Attempted to resume a coroutine to the wrong actor!"
- #:expected-actor-id (message-to message)
- #:got-actor-id resume-actor-id
- #:message message))
- (let (;; @@: How should we resolve resuming coroutines to actors who are
- ;; now gone?
- (actor (resolve-actor-to))
- (result (kont message)))
- (maybe-autoreply actor)
- result))
- (#f (throw 'no-waiting-coroutine
- "message in-reply-to tries to resume nonexistent coroutine"
- message))))))
+ (cond
+ ((eq? (message-action message) '*reply*)
+ (call-catching-coroutine
+ (lambda ()
+ (match (hash-remove! (hive-waiting-coroutines hive)
+ (message-in-reply-to message))
+ ((_ . (resume-actor-id . kont))
+ (if (not (equal? (message-to message)
+ resume-actor-id))
+ (throw 'resuming-to-wrong-actor
+ "Attempted to resume a coroutine to the wrong actor!"
+ #:expected-actor-id (message-to message)
+ #:got-actor-id resume-actor-id
+ #:message message))
+ (let (;; @@: How should we resolve resuming coroutines to actors who are
+ ;; now gone?
+ (actor (resolve-actor-to))
+ (result (kont message)))
+ (maybe-autoreply actor)
+ result))
+ (#f (throw 'no-waiting-coroutine
+ "message in-reply-to tries to resume nonexistent coroutine"
+ message))))))
+ ;; Yikes, we must have gotten an error or something back
+ (else
+ ;; @@: Not what we want in the long run?
+ ;; What we'd *prefer* to do is to resume this message
+ ;; and throw an error inside the message handler
+ ;; (say, from send-mesage-wait), but that causes a SIGABRT (??!!)
+ (hash-remove! (hive-waiting-coroutines hive)
+ (message-in-reply-to message))
+ (let ((explaination
+ (if (eq? (message-action message) '*reply*)
+ "Won't resume coroutine; got an *error* as a reply"
+ "Won't resume coroutine because action is not *reply*")))
+ (throw 'hive-unresumable-coroutine
+ explaination
+ #:message message)))))
(define (process-remote-message)
;; Find the ambassador