X-Git-Url: https://jxself.org/git/?a=blobdiff_plain;f=8sync%2Factors.scm;h=ca8a55f86ec2a90649b246fd544b12b194f2492d;hb=1253ea73ba4020df78671955721721179b556bfc;hp=9e249cdd987db42fe3c721d9b886b516ab08c33f;hpb=57b7dfc7a351017e09bd158889ed45fa57e51289;p=8sync.git
diff --git a/8sync/actors.scm b/8sync/actors.scm
index 9e249cd..ca8a55f 100644
--- a/8sync/actors.scm
+++ b/8sync/actors.scm
@@ -23,15 +23,19 @@
#:use-module (ice-9 format)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
+ #:use-module ((ice-9 ports internal)
+ #:select (port-read-wait-fd port-write-wait-fd))
#:use-module (ice-9 pretty-print)
#:use-module (ice-9 receive)
+ #:use-module (ice-9 suspendable-ports)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers conditions)
#:use-module (fibers operations)
- #:use-module (8sync agenda)
+ #:use-module (fibers internal)
#:use-module (8sync inbox)
#:use-module (8sync rmeta-slot)
+
#:export (;; utilities... ought to go in their own module
big-random-number
big-random-number-string
@@ -43,29 +47,30 @@
;;; Commenting out the
type for now;
;;; it may be back when we have better serializers
;;
- make-address address?
+ make-address
address-actor-id address-hive-id
- actor-init! actor-cleanup!
-
address->string
actor-id-actor
actor-id-hive
actor-id-string
+ actor-init! actor-cleanup!
+
actor-alive?
build-actions
define-actor
+ actor-spawn-fiber
+ with-actor-nonblocking-ports
+
;;
;; make-hive
;; ;; There are more methods for the hive, but there's
;; ;; no reason for the outside world to look at them maybe?
;; hive-id
- bootstrap-actor bootstrap-actor*
-
create-actor create-actor*
self-destruct
@@ -75,8 +80,6 @@
message-id message-body message-in-reply-to
message-wants-reply
- message-auto-reply?
-
<- <-wait
spawn-hive run-hive))
@@ -192,7 +195,7 @@
(when (not prompt)
(error "Tried to <-wait without being in an actor's context..."))
- (let ((reply (abort-to-prompt prompt to action args)))
+ (let ((reply (abort-to-prompt prompt '<-wait to action args)))
(cond ((eq? action '*error*)
(throw 'hive-unresumable-coroutine
"Won't resume coroutine; got an *error* as a reply"
@@ -217,6 +220,11 @@
#:message message))
(apply method actor message (message-body message)))
+(define-syntax-rule (wrap-apply body)
+ "Wrap possibly multi-value function in a procedure, applies all arguments"
+ (lambda args
+ (apply body args)))
+
(define-syntax-rule (build-actions (symbol method) ...)
"Construct an alist of (symbol . method), where the method is wrapped
with wrap-apply to facilitate live hacking and allow the method definition
@@ -232,9 +240,6 @@ to come after class definition."
;; kicks the bucket
(id #:init-keyword #:address
#:getter actor-id)
- ;; The connection to the hive we're connected to.
- (hive-channel #:init-keyword #:hive-channel
- #:accessor actor-hive-channel)
;; Our queue to send/receive messages on
(inbox-deq #:init-thunk make-channel
@@ -331,6 +336,9 @@ dead? condition."
(define *actor-prompt*
(make-parameter #f))
+(define *resume-io-channel*
+ (make-parameter #f))
+
(define (actor-main-loop actor)
"Main loop of the actor. Loops around, pulling messages off its queue
and handling them."
@@ -342,6 +350,9 @@ and handling them."
(define dead?
(address-dead? (actor-id actor)))
(define prompt (make-prompt-tag (actor-id-actor actor)))
+ ;; Not always used, only if with-actor-nonblocking-ports is used
+ (define resume-io-channel
+ (make-channel))
(define (handle-message message)
(catch #t
@@ -404,18 +415,27 @@ and handling them."
;; Here's where we abort to if we're doing <-wait
;; @@: maybe use match-lambda if we're going to end up
;; handling multiple ~commands
- (lambda (kont to action message-args)
- (define message-id
- ((actor-msg-id-generator actor)))
- (hash-set! waiting message-id kont)
- (%<- #t actor to action message-args message-id #f)))
- #t)))) ; loop again
+ (match-lambda*
+ ((kont '<-wait to action message-args)
+ (define message-id
+ ((actor-msg-id-generator actor)))
+ (hash-set! waiting message-id kont)
+ (%<- #t actor to action message-args message-id #f))
+ ((kont 'run-me proc)
+ (proc kont))))
+ #t)) ; loop again
+ (wrap-operation (get-operation resume-io-channel)
+ (lambda (thunk)
+ (thunk
+ #t)))))
;; Mutate the parameter; this should be fine since each fiber
;; runs in its own dynamic state with with-dynamic-state.
;; See with-dynamic-state discussion in
;; https://wingolog.org/archives/2017/06/27/growing-fibers
(*current-actor* actor)
+ (*resume-io-channel* resume-io-channel)
+
;; We temporarily set the *actor-prompt* to #f to make sure that
;; actor-init! doesn't try to do a <-wait message (and not accidentally use
;; the parent fiber's *actor-prompt* either.)
@@ -427,6 +447,61 @@ and handling them."
(and (perform-operation halt-or-handle-message)
(loop))))
+
+;; @@: So in order for this to work, we're going to have to add
+;; another channel to actors, which is resumable i/o. We'll have to
+;; spawn a fiber that wakes up a thunk on the actor when its port is
+;; available. Funky...
+
+(define (%suspend-io-to-actor resume-method get-wait-fd-method)
+ (lambda (port)
+ (define prompt (*actor-prompt*))
+ (define resume-channel (*resume-io-channel*))
+ (define (run-at-prompt k)
+ (spawn-fiber
+ (lambda ()
+ (suspend-current-fiber
+ (lambda (fiber)
+ (resume-on-readable-fd (port-read-wait-fd port) fiber)))
+ ;; okay, we're awake again, tell the actor to resume this
+ ;; continuation
+ (put-message resume-channel k))
+ #:parallel? #f))
+ (when (not prompt)
+ (error "Attempt to abort to actor prompt outside of actor"))
+ (abort-to-prompt (*actor-prompt*)
+ 'run-me run-at-prompt)))
+
+(define suspend-read-to-actor
+ (%suspend-io-to-actor resume-on-readable-fd port-read-wait-fd))
+
+(define suspend-write-to-actor
+ (%suspend-io-to-actor resume-on-writable-fd port-write-wait-fd))
+
+(define (with-actor-nonblocking-ports thunk)
+ "Runs THUNK in dynamic context in which attempting to read/write
+from a port that would otherwise block an actor's correspondence with
+other actors (note that reading from a nonblocking port should never
+block other fibers) will instead permit reading other messages while
+I/O is waiting to complete.
+
+Note that currently "
+ (parameterize ((current-read-waiter suspend-read-to-actor)
+ (current-write-waiter suspend-write-to-actor))
+ (thunk)))
+
+(define (actor-spawn-fiber thunk . args)
+ "Spawn a fiber from an actor but unset actor-machinery-specific
+dynamic context."
+ (apply spawn-fiber
+ (lambda ()
+ (*current-actor* #f)
+ (*resume-io-channel* #f)
+ (*actor-prompt* #f)
+ (thunk))
+ args))
+
+
;;; Actor utilities
;;; ===============
@@ -445,6 +520,7 @@ and handling them."
;;; Every actor has a hive, which keeps track of other actors, manages
;;; cleanup, and performs inter-hive communication.
+;; TODO: Make this a srfi-9 record type
(define-class ()
(id #:init-keyword #:id
#:getter hive-id)
@@ -508,12 +584,17 @@ values, the first value being a symbol"
(and (perform-operation halt-or-handle)
(lp))))
-(define *current-hive* (make-parameter #f))
+(define *hive-id* (make-parameter #f))
+(define *hive-channel* (make-parameter #f))
+;; @@: Should we halt the hive either at the end of spawn-hive or run-hive?
(define* (spawn-hive proc #:key (hive (make-hive)))
- "Spawn a hive in a fiber running PROC, passing it the fresh hive"
+ "Spawn a hive and run PROC, passing it the fresh hive and establishing
+a dynamic context surrounding the hive."
(spawn-fiber (lambda () (hive-main-loop hive)))
- (proc hive))
+ (parameterize ((*hive-id* (hive-id hive))
+ (*hive-channel* (hive-channel hive)))
+ (proc hive)))
(define (run-hive proc . args)
"Spawn a hive and run it in run-fibers. Takes a PROC as would be passed
@@ -523,15 +604,15 @@ to spawn-hive... all remaining arguments passed to run-fibers."
(spawn-hive proc))
args))
-(define (%create-actor hive-channel hive-id
- actor-class init-args id-cookie send-init?)
- (let* ((actor-id (gen-actor-id id-cookie))
+(define (%create-actor actor-class init-args id-cookie send-init?)
+ (let* ((hive-channel (*hive-channel*))
+ (hive-id (*hive-id*))
+ (actor-id (gen-actor-id id-cookie))
(dead? (make-condition))
(inbox-enq (make-channel))
(address (make-address actor-id hive-id
inbox-enq dead?))
(actor (apply make actor-class
- #:hive-channel hive-channel
#:address address
init-args))
(should-init (actor-should-init actor)))
@@ -555,35 +636,20 @@ to spawn-hive... all remaining arguments passed to run-fibers."
;; return the address
address))
-(define* (bootstrap-actor hive actor-class #:rest init-args)
- "Create an actor on HIVE using ACTOR-CLASS passing in INIT-ARGS args"
- (%create-actor (hive-channel hive) (hive-id hive) actor-class
- init-args (symbol->string (class-name actor-class))
- #f))
-
-(define* (bootstrap-actor* hive actor-class id-cookie #:rest init-args)
- "Create an actor, but also allow customizing a 'cookie' added to the id
-for debugging"
- (%create-actor (hive-channel hive) (hive-id hive) actor-class
- init-args id-cookie
- #f))
-
-(define* (create-actor from-actor actor-class #:rest init-args)
+(define* (create-actor actor-class #:rest init-args)
"Create an instance of actor-class. Return the new actor's id.
This is the method actors should call directly (unless they want
to supply an id-cookie, in which case they should use
create-actor*)."
- (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor)
- actor-class init-args #f #t))
+ (%create-actor actor-class init-args #f #t))
-(define* (create-actor* from-actor actor-class id-cookie #:rest init-args)
+(define* (create-actor* actor-class id-cookie #:rest init-args)
"Create an instance of actor-class. Return the new actor's id.
Like create-actor, but permits supplying an id-cookie."
- (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor)
- actor-class init-args id-cookie #t))
+ (%create-actor actor-class init-args id-cookie #t))
(define* (self-destruct actor #:key (cleanup #t))
"Remove an actor from the hive.
@@ -591,7 +657,7 @@ Like create-actor, but permits supplying an id-cookie."
Unless #:cleanup is set to #f, this will first have the actor handle
its '*cleanup* action handler."
(signal-condition! (address-dead? (actor-id actor)))
- (put-message (actor-hive-channel actor) (list 'remove-actor (actor-id-actor actor)))
+ (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor)))
;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending
;; a message with <-wait
(*actor-prompt* #f)