- (let ((hive (make <hive>
- #:id (make-address
- "hive" (or hive-id
- (big-random-number-string))))))
- ;; Set the hive's actor reference to itself
- (set! (actor-hive hive) hive)
- hive))
-
-(define-method (hive-id (hive <hive>))
- (actor-id-hive hive))
-
-(define-method (hive-gen-actor-id (hive <hive>) cookie)
- (make-address (if cookie
- (string-append cookie "-" (big-random-number-string))
- (big-random-number-string))
- (hive-id hive)))
-
-(define-method (hive-gen-message-id (hive <hive>))
- "Generate a message id using HIVE's message id generator"
- ((hive-msg-id-generator hive)))
-
-(define-method (hive-resolve-local-actor (hive <hive>) actor-address)
- (hash-ref (hive-actor-registry hive) actor-address))
-
-(define-method (hive-resolve-ambassador (hive <hive>) ambassador-address)
- (hash-ref (hive-ambassadors hive) ambassador-address))
-
-(define-method (make-forward-request (hive <hive>) (ambassador <actor>) message)
- (make-message (hive-gen-message-id hive) (actor-id ambassador)
- ;; If we make the hive not an actor, we could either switch this
- ;; to #f or to the original actor...?
- ;; Maybe some more thinking should be done on what should
- ;; happen in case of failure to forward? Handling ambassador failures
- ;; seems like the primary motivation for the hive remaining an actor.
- (actor-id hive)
- '*forward*
- `((original . ,message))))
-
-(define-method (hive-reply-with-error (hive <hive>) original-message
- error-key error-args)
- ;; We only supply the error-args if the original sender is on the same hive
- (define (orig-actor-on-same-hive?)
- (equal? (hive-id hive)
- (address-hive-id (message-from original-message))))
- (set-message-replied! original-message #t)
- (let* ((new-message-body
- (if (orig-actor-on-same-hive?)
- `(#:original-message ,original-message
- #:error-key ,error-key
- #:error-args ,error-args)
- `(#:original-message ,original-message
- #:error-key ,error-key)))
- (new-message (make-message (hive-gen-message-id hive)
- (message-from original-message)
- (actor-id hive) '*error*
- new-message-body
- #:in-reply-to (message-id original-message))))
- ;; We only return a thunk, rather than run 8sync here, because if
- ;; we ran 8sync in the middle of a catch we'd end up with an
- ;; unresumable continuation.
- (lambda () (hive-process-message hive new-message))))
-
-(define-method (hive-process-message (hive <hive>) message)
- "Handle one message, or forward it via an ambassador"
- (define (maybe-autoreply actor)
- ;; Possibly autoreply
- (if (message-needs-reply? message)
- (<-auto-reply actor message)))
-
- (define (resolve-actor-to)
- "Get the actor the message was aimed at"
- (let ((actor (hive-resolve-local-actor hive (message-to message))))
- (if (not actor)
- (throw 'actor-not-found
- (format #f "Message ~a from ~a directed to nonexistant actor ~a"
- (message-id message)
- (address->string (message-from message))
- (address->string (message-to message)))
- message))
- actor))
-
- (define (call-catching-coroutine thunk)
- (define queued-error-handling-thunk #f)
- (define (call-catching-errors)
- ;; TODO: maybe parameterize (or attach to hive) and use
- ;; maybe-catch-all from agenda.scm
- ;; @@: Why not just use with-throw-handler and let the catch
- ;; happen at the agenda? That's what we used to do, but
- ;; it ended up with a SIGABRT. See:
- ;; http://lists.gnu.org/archive/html/bug-guile/2016-05/msg00003.html
- (catch #t
- thunk
- ;; In the actor model, we don't totally crash on errors.
- (lambda _ #f)
- ;; If an error happens, we raise it
- (lambda (key . args)
- (if (message-needs-reply? message)
- ;; If the message is waiting on a reply, let them know
- ;; something went wrong.
- ;; However, we have to do it outside of this catch
- ;; routine, or we'll end up in an unrewindable continuation
- ;; situation.
- (set! queued-error-handling-thunk
- (hive-reply-with-error hive message key args)))
- ;; print error message
- (apply print-error-and-continue key args)))
- ;; @@: This is a kludge. See above for why.
- (if queued-error-handling-thunk
- (8sync (queued-error-handling-thunk))))
- (call-with-prompt (hive-prompt hive)
- call-catching-errors
- (lambda (kont actor message)
- ;; Register the coroutine
- (hash-set! (hive-waiting-coroutines hive)
- (message-id message)
- (cons (actor-id actor) kont))
- ;; Send off the message
- (8sync (hive-process-message hive message)))))
-
- (define (process-local-message)
- (let ((actor (resolve-actor-to)))
- (call-catching-coroutine
- (lambda ()
- (define message-handler (actor-message-handler actor))
- ;; @@: Should a more general error handling happen here?
- (parameterize ((%current-actor actor))
- (let ((result
- (message-handler actor message)))
- (maybe-autoreply actor)
- ;; Returning result allows actors to possibly make a run-request
- ;; at the end of handling a message.
- ;; ... We do want that, right?
- result))))))
-
- (define (resume-waiting-coroutine)
- (cond
- ((or (eq? (message-action message) '*reply*)
- (eq? (message-action message) '*auto-reply*))
- (call-catching-coroutine
- (lambda ()
- (match (hash-remove! (hive-waiting-coroutines hive)
- (message-in-reply-to message))
- ((_ . (resume-actor-id . kont))
- (if (not (equal? (message-to message)
- resume-actor-id))
- (throw 'resuming-to-wrong-actor
- "Attempted to resume a coroutine to the wrong actor!"
- #:expected-actor-id (message-to message)
- #:got-actor-id resume-actor-id
- #:message message))
- (let (;; @@: How should we resolve resuming coroutines to actors who are
- ;; now gone?
- (actor (resolve-actor-to))
- (result (kont message)))
- (maybe-autoreply actor)
- result))
- (#f (throw 'no-waiting-coroutine
- "message in-reply-to tries to resume nonexistent coroutine"
- message))))))
- ;; Yikes, we must have gotten an error or something back
- (else
- ;; @@: Not what we want in the long run?
- ;; What we'd *prefer* to do is to resume this message
- ;; and throw an error inside the message handler
- ;; (say, from send-mesage-wait), but that causes a SIGABRT (??!!)
- (hash-remove! (hive-waiting-coroutines hive)
- (message-in-reply-to message))
- (let ((explaination
- (if (eq? (message-action message) '*reply*)
- "Won't resume coroutine; got an *error* as a reply"
- "Won't resume coroutine because action is not *reply*")))
- (throw 'hive-unresumable-coroutine
- explaination
- #:message message)))))
-
- (define (process-remote-message)
- ;; Find the ambassador
- (let* ((remote-hive-id (hive-id (message-to message)))
- (ambassador (hive-resolve-ambassador remote-hive-id))
- (message-handler (actor-message-handler ambassador))
- (forward-request (make-forward-request hive ambassador message)))
- (message-handler ambassador forward-request)))
-
- (let ((to (message-to message)))
- ;; This seems to be an easy mistake to make, so check that addressing
- ;; is correct here
- (if (not to)
- (throw 'missing-addressee
- "`to' field is missing on message"
- #:message message))
- (if (hive-actor-local? hive to)
- (if (message-in-reply-to message)
- (resume-waiting-coroutine)
- (process-local-message))
- (process-remote-message))))
-
-(define-method (hive-actor-local? (hive <hive>) address)
- (equal? (hive-id hive) (address-hive-id address)))
-
-(define-method (hive-register-actor! (hive <hive>) (actor <actor>))
- (hash-set! (hive-actor-registry hive) (actor-id actor) actor))
-
-(define-method (%hive-create-actor (hive <hive>) actor-class
- init id-cookie)
- "Actual method called by hive-create-actor.
-
-Since this is a define-method it can't accept fancy define* arguments,
-so this gets called from the nicer hive-create-actor interface. See
-that method for documentation."
- (let* ((actor-id (hive-gen-actor-id hive id-cookie))
+ (make <hive> #:id (or hive-id
+ (big-random-number-string))))
+
+(define (gen-actor-id cookie)
+ (if cookie
+ (string-append cookie ":" (big-random-number-string))
+ (big-random-number-string)))
+
+(define (hive-main-loop hive)
+ "The main loop of the hive. This listens for messages on the hive-channel
+for certain actions to perform.
+
+`messages' here is not the same as a <message> object; these are a list of
+values, the first value being a symbol"
+ (define channel (hive-channel hive))
+ (define halt? (hive-halt? hive))
+ (define registry (hive-actor-registry hive))
+
+ ;; not the same as a <message> ;P
+ (define handle-message
+ (match-lambda
+ (('register-actor actor-id address actor)
+ (hash-set! registry actor-id (vector address actor)))
+ ;; Remove the actor from hive
+ (('remove-actor actor-id)
+ (hash-remove! (hive-actor-registry hive) actor-id))
+ (('register-ambassador hive-id ambassador-actor-id)
+ 'TODO)
+ (('unregister-ambassador hive-id ambassador-actor-id)
+ 'TODO)
+ (('forward-message from-actor-id message)
+ 'TODO)))
+
+ (define halt-or-handle
+ (choice-operation
+ (wrap-operation (get-operation channel)
+ (lambda (msg)
+ (handle-message msg)
+ #t))
+ (wrap-operation (wait-operation halt?)
+ (const #f))))
+
+ (let lp ()
+ (and (perform-operation halt-or-handle)
+ (lp))))
+
+(define *current-hive* (make-parameter #f))
+
+(define* (spawn-hive proc #:key (hive (make-hive)))
+ "Spawn a hive in a fiber running PROC, passing it the fresh hive"
+ (spawn-fiber (lambda () (hive-main-loop hive)))
+ (proc hive))
+
+(define (run-hive proc . args)
+ "Spawn a hive and run it in run-fibers. Takes a PROC as would be passed
+to spawn-hive... all remaining arguments passed to run-fibers."
+ (apply run-fibers
+ (lambda ()
+ (spawn-hive proc))
+ args))
+
+(define (%create-actor hive-channel hive-id
+ actor-class init-args id-cookie send-init?)
+ (let* ((actor-id (gen-actor-id id-cookie))
+ (dead? (make-condition))
+ (inbox-enq (make-channel))
+ (address (make-address actor-id hive-id
+ inbox-enq dead?))