X-Git-Url: https://jxself.org/git/?a=blobdiff_plain;f=8sync%2Factors.scm;h=bb4d342a8ff0f49ba5f78ff64c60c52076be767b;hb=c08fc1bf81e422ead4e3394e711ce8d0724559ae;hp=d12427e5ef056aefa64f14d9415729b01f3c6f63;hpb=1e636b5da1e6c25ba366b475f939cd4899770fef;p=8sync.git
diff --git a/8sync/actors.scm b/8sync/actors.scm
index d12427e..bb4d342 100644
--- a/8sync/actors.scm
+++ b/8sync/actors.scm
@@ -19,6 +19,7 @@
(define-module (8sync actors)
#:use-module (oop goops)
#:use-module (srfi srfi-9)
+ #:use-module (srfi srfi-9 gnu)
#:use-module (ice-9 control)
#:use-module (ice-9 format)
#:use-module (ice-9 match)
@@ -32,7 +33,6 @@
#:use-module (fibers channels)
#:use-module (fibers conditions)
#:use-module (fibers operations)
- #:use-module (fibers internal)
#:use-module (8sync inbox)
#:use-module (8sync rmeta-slot)
@@ -44,6 +44,8 @@
actor-id
actor-message-handler
+ *current-actor*
+
;;; Commenting out the
type for now;
;;; it may be back when we have better serializers
;;
@@ -57,8 +59,6 @@
actor-init! actor-cleanup!
- actor-alive?
-
build-actions
define-actor
@@ -71,8 +71,6 @@
;; ;; There are more methods for the hive, but there's
;; ;; no reason for the outside world to look at them maybe?
;; hive-id
- bootstrap-actor bootstrap-actor*
-
create-actor create-actor*
self-destruct
@@ -84,7 +82,11 @@
<- <-wait
- spawn-hive run-hive))
+ spawn-hive run-hive
+
+ ;; Maybe the wrong place for this, or for it to be exported.
+ ;; But it's used in websockets' server implementation at least...
+ live-wrap))
;; For ids
(set! *random-state* (random-state-from-platform))
@@ -155,7 +157,7 @@
;; This is the internal, generalized message sending method.
;; Users shouldn't use it! Use the <-foo forms instead.
-(define-inlinable (%<- wants-reply from-actor to action args message-id in-reply-to)
+(define (%<- wants-reply from-actor to action args message-id in-reply-to)
;; Okay, we need to deal with message ids.
;; Could we get rid of them? :\
;; It seems if we can use eq? and have messages be immutable then
@@ -163,7 +165,7 @@
;; If we need to track replies across hive boundaries we could
;; register unique ids across the ambassador barrier.
(match to
- (#(_ _ (? channel? channel) dead?)
+ (($ _ _ (? channel? channel) dead?)
(let ((message (make-message message-id to
(and from-actor (actor-id from-actor))
action args
@@ -174,12 +176,18 @@
(put-operation channel message)
(wait-operation dead?)))))
;; TODO: put remote addresses here.
- (#(actor-id hive-id #f #f)
+ (($ actor-id hive-id #f #f)
;; Here we'd make a call to our hive...
'TODO)
;; A message sent to nobody goes nowhere.
;; TODO: Should we display a warning here, probably?
- (#f #f)))
+ (#f #f)
+ ;; We shouldn't technically be passing in actors but rather their
+ ;; addresses, but often actors want to message themselves and
+ ;; this makes that slightly easier.
+ ((? (lambda (x) (is-a? x )) actor)
+ (%<- wants-reply from-actor (actor-id actor) action
+ args message-id in-reply-to))))
(define (<- to action . args)
(define from-actor (*current-actor*))
@@ -189,16 +197,13 @@
(big-random-number-string))
#f))
-;; TODO: this should abort to the prompt, then check for errors
-;; when resuming.
-
(define (<-wait to action . args)
(define prompt (*actor-prompt*))
(when (not prompt)
(error "Tried to <-wait without being in an actor's context..."))
(let ((reply (abort-to-prompt prompt '<-wait to action args)))
- (cond ((eq? action '*error*)
+ (cond ((eq? (message-action reply) '*error*)
(throw 'hive-unresumable-coroutine
"Won't resume coroutine; got an *error* as a reply"
#:message reply))
@@ -222,29 +227,23 @@
#:message message))
(apply method actor message (message-body message)))
-(define-syntax-rule (wrap-apply body)
+(define-syntax-rule (live-wrap body)
"Wrap possibly multi-value function in a procedure, applies all arguments"
(lambda args
(apply body args)))
(define-syntax-rule (build-actions (symbol method) ...)
"Construct an alist of (symbol . method), where the method is wrapped
-with wrap-apply to facilitate live hacking and allow the method definition
+with `live-wrap' to facilitate live hacking and allow the method definition
to come after class definition."
(build-rmeta-slot
(list (cons (quote symbol)
- (wrap-apply method)) ...)))
+ (live-wrap method)) ...)))
(define-class ()
- ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?)
- ;; - inbox-channel is the receiving channel (as opposed to actor-inbox-deq)
- ;; - dead? is a fibers condition variable which is set once this actor
- ;; kicks the bucket
+ ;; An object
(id #:init-keyword #:address
#:getter actor-id)
- ;; The connection to the hive we're connected to.
- (hive-channel #:init-keyword #:hive-channel
- #:accessor actor-hive-channel)
;; Our queue to send/receive messages on
(inbox-deq #:init-thunk make-channel
@@ -283,24 +282,37 @@ to come after class definition."
(define-method (actor-cleanup! (actor ))
'no-op)
-;;; Addresses are vectors where the first part is the actor-id and
-;;; the second part is the hive-id. This works well enough... they
-;;; look decent being pretty-printed.
-
-(define (make-address actor-id hive-id channel dead?)
- (vector actor-id hive-id channel dead?))
-
-(define (address-actor-id address)
- (vector-ref address 0))
-
-(define (address-hive-id address)
- (vector-ref address 1))
-
-(define (address-channel address)
- (vector-ref address 2))
-
-(define (address-dead? address)
- (vector-ref address 3))
+;;; Every actor has an address, which is how its identified.
+;;; We also pack in some routing information.
+(define-record-type
+ (make-address actor-id hive-id channel dead?)
+ address?
+ ;; Unique-to-this-actor-on-this-hive part
+ (actor-id address-actor-id)
+ ;; Unique identifier for the hive we're connected to.
+ ;; If we don't have a "direct" link to the other actor through
+ ;; a channel, we'll have to look up if our hive has a way to route
+ ;; to the other hive.
+ (hive-id address-hive-id)
+ ;; The receiving channel (as opposed to actor-inbox-deq)
+ (channel address-channel)
+ ;; A fibers condition variable which is set once this actor kicks
+ ;; the bucket
+ (dead? address-dead?))
+
+(set-record-type-printer!
+
+ (lambda (address port)
+ (format port ""
+ (address->string address)
+ (if (address-channel address)
+ (string-append
+ ":local"
+ (if (atomic-box-ref
+ ((@@ (fibers conditions) condition-signalled?)
+ (address-dead? address)))
+ " :dead" ""))
+ ":remote"))))
(define (address->string address)
(string-append (address-actor-id address) "@"
@@ -311,14 +323,10 @@ to come after class definition."
This compares the actor-id and hive-id but ignores the channel and
dead? condition."
- (match address1
- (#(actor-id-1 hive-id-1 _ _)
- (match address2
- (#(actor-id-2 hive-id-2)
- (and (equal? actor-id-1 actor-id-2)
- (and (equal? hive-id-1 hive-id-2))))
- (_ #f)))
- (_ #f)))
+ (and (equal? (address-actor-id address1)
+ (address-actor-id address2))
+ (equal? (address-hive-id address1)
+ (address-hive-id address2))))
(define (actor-id-actor actor)
"Get the actor id component of the actor-id"
@@ -368,10 +376,9 @@ and handling them."
(lambda vals
;; Return reply if necessary
(when (message-wants-reply message)
- (when (message-wants-reply message)
- (%<- #f actor (message-from message) '*reply*
- vals ((actor-msg-id-generator actor))
- (message-id message)))))))
+ (%<- #f actor (message-from message) '*reply*
+ vals ((actor-msg-id-generator actor))
+ (message-id message))))))
(const #t)
(let ((err (current-error-port)))
(lambda (key . args)
@@ -402,6 +409,21 @@ and handling them."
"Tried to resume nonexistant message: ~a\n"
(message-id message)))))
+ (define (call-with-actor-prompt thunk)
+ (call-with-prompt prompt
+ thunk
+ ;; Here's where we abort to if we're doing <-wait
+ ;; @@: maybe use match-lambda if we're going to end up
+ ;; handling multiple ~commands
+ (match-lambda*
+ ((kont '<-wait to action message-args)
+ (define message-id
+ ((actor-msg-id-generator actor)))
+ (hash-set! waiting message-id kont)
+ (%<- #t actor to action message-args message-id #f))
+ ((kont 'run-me proc)
+ (proc kont)))))
+
(define halt-or-handle-message
;; It would be nice if we could give priorities to certain operations.
;; halt should always win over getting a message...
@@ -410,29 +432,20 @@ and handling them."
(const #f)) ; halt and return
(wrap-operation (get-operation (actor-inbox-deq actor))
(lambda (message)
- (call-with-prompt prompt
- (lambda ()
- (if (message-in-reply-to message)
- ;; resume a continuation which was waiting on a reply
- (resume-handler message)
- ;; start handling a new message
- (handle-message message)))
- ;; Here's where we abort to if we're doing <-wait
- ;; @@: maybe use match-lambda if we're going to end up
- ;; handling multiple ~commands
- (match-lambda*
- ((kont '<-wait to action message-args)
- (define message-id
- ((actor-msg-id-generator actor)))
- (hash-set! waiting message-id kont)
- (%<- #t actor to action message-args message-id #f))
- ((kont 'run-me proc)
- (proc kont))))
+ (call-with-actor-prompt
+ (lambda ()
+ (if (message-in-reply-to message)
+ ;; resume a continuation which was waiting on a reply
+ (resume-handler message)
+ ;; start handling a new message
+ (handle-message message))))
#t)) ; loop again
(wrap-operation (get-operation resume-io-channel)
(lambda (thunk)
- (thunk
- #t)))))
+ (call-with-actor-prompt
+ (lambda ()
+ (thunk)))
+ #t))))
;; Mutate the parameter; this should be fine since each fiber
;; runs in its own dynamic state with with-dynamic-state.
@@ -458,16 +471,14 @@ and handling them."
;; spawn a fiber that wakes up a thunk on the actor when its port is
;; available. Funky...
-(define (%suspend-io-to-actor resume-method get-wait-fd-method)
+(define (%suspend-io-to-actor wait-for-read/write)
(lambda (port)
(define prompt (*actor-prompt*))
(define resume-channel (*resume-io-channel*))
(define (run-at-prompt k)
(spawn-fiber
(lambda ()
- (suspend-current-fiber
- (lambda (fiber)
- (resume-on-readable-fd (port-read-wait-fd port) fiber)))
+ (wait-for-read/write port)
;; okay, we're awake again, tell the actor to resume this
;; continuation
(put-message resume-channel k))
@@ -478,10 +489,10 @@ and handling them."
'run-me run-at-prompt)))
(define suspend-read-to-actor
- (%suspend-io-to-actor resume-on-readable-fd port-read-wait-fd))
+ (%suspend-io-to-actor (@@ (fibers) wait-for-readable)))
(define suspend-write-to-actor
- (%suspend-io-to-actor resume-on-writable-fd port-write-wait-fd))
+ (%suspend-io-to-actor (@@ (fibers) wait-for-writable)))
(define (with-actor-nonblocking-ports thunk)
"Runs THUNK in dynamic context in which attempting to read/write
@@ -525,6 +536,7 @@ dynamic context."
;;; Every actor has a hive, which keeps track of other actors, manages
;;; cleanup, and performs inter-hive communication.
+;; TODO: Make this a srfi-9 record type
(define-class ()
(id #:init-keyword #:id
#:getter hive-id)
@@ -588,12 +600,17 @@ values, the first value being a symbol"
(and (perform-operation halt-or-handle)
(lp))))
-(define *current-hive* (make-parameter #f))
+(define *hive-id* (make-parameter #f))
+(define *hive-channel* (make-parameter #f))
+;; @@: Should we halt the hive either at the end of spawn-hive or run-hive?
(define* (spawn-hive proc #:key (hive (make-hive)))
- "Spawn a hive in a fiber running PROC, passing it the fresh hive"
+ "Spawn a hive and run PROC, passing it the fresh hive and establishing
+a dynamic context surrounding the hive."
(spawn-fiber (lambda () (hive-main-loop hive)))
- (proc hive))
+ (parameterize ((*hive-id* (hive-id hive))
+ (*hive-channel* (hive-channel hive)))
+ (proc hive)))
(define (run-hive proc . args)
"Spawn a hive and run it in run-fibers. Takes a PROC as would be passed
@@ -603,15 +620,15 @@ to spawn-hive... all remaining arguments passed to run-fibers."
(spawn-hive proc))
args))
-(define (%create-actor hive-channel hive-id
- actor-class init-args id-cookie send-init?)
- (let* ((actor-id (gen-actor-id id-cookie))
+(define (%create-actor actor-class init-args id-cookie send-init?)
+ (let* ((hive-channel (*hive-channel*))
+ (hive-id (*hive-id*))
+ (actor-id (gen-actor-id id-cookie))
(dead? (make-condition))
(inbox-enq (make-channel))
(address (make-address actor-id hive-id
inbox-enq dead?))
(actor (apply make actor-class
- #:hive-channel hive-channel
#:address address
init-args))
(should-init (actor-should-init actor)))
@@ -635,35 +652,20 @@ to spawn-hive... all remaining arguments passed to run-fibers."
;; return the address
address))
-(define* (bootstrap-actor hive actor-class #:rest init-args)
- "Create an actor on HIVE using ACTOR-CLASS passing in INIT-ARGS args"
- (%create-actor (hive-channel hive) (hive-id hive) actor-class
- init-args (symbol->string (class-name actor-class))
- #f))
-
-(define* (bootstrap-actor* hive actor-class id-cookie #:rest init-args)
- "Create an actor, but also allow customizing a 'cookie' added to the id
-for debugging"
- (%create-actor (hive-channel hive) (hive-id hive) actor-class
- init-args id-cookie
- #f))
-
-(define* (create-actor from-actor actor-class #:rest init-args)
+(define* (create-actor actor-class #:rest init-args)
"Create an instance of actor-class. Return the new actor's id.
This is the method actors should call directly (unless they want
to supply an id-cookie, in which case they should use
create-actor*)."
- (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor)
- actor-class init-args #f #t))
+ (%create-actor actor-class init-args #f #t))
-(define* (create-actor* from-actor actor-class id-cookie #:rest init-args)
+(define* (create-actor* actor-class id-cookie #:rest init-args)
"Create an instance of actor-class. Return the new actor's id.
Like create-actor, but permits supplying an id-cookie."
- (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor)
- actor-class init-args id-cookie #t))
+ (%create-actor actor-class init-args id-cookie #t))
(define* (self-destruct actor #:key (cleanup #t))
"Remove an actor from the hive.
@@ -671,20 +673,9 @@ Like create-actor, but permits supplying an id-cookie."
Unless #:cleanup is set to #f, this will first have the actor handle
its '*cleanup* action handler."
(signal-condition! (address-dead? (actor-id actor)))
- (put-message (actor-hive-channel actor) (list 'remove-actor (actor-id-actor actor)))
+ (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor)))
;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending
;; a message with <-wait
(*actor-prompt* #f)
(actor-cleanup! actor))
-;; From a patch I sent to Fibers...
-(define (condition-signalled? cvar)
- "Return @code{#t} if @var{cvar} has already been signalled.
-
-In general you will want to use @code{wait} or @code{wait-operation} to
-wait on a condition. However, sometimes it is useful to see whether or
-not a condition has already been signalled without blocking."
- (atomic-box-ref ((@@ (fibers conditions) condition-signalled?) cvar)))
-
-(define (actor-alive? actor)
- (condition-signalled? (address-dead? (actor-id actor))))