X-Git-Url: https://jxself.org/git/?a=blobdiff_plain;f=8sync%2Factors.scm;h=a069a7182a4ae74052a62874cf85266424eec654;hb=0f7daa4787860cbbc739a51140fb8257b7fc4fef;hp=48070c43f10deaeebea1db5c18c23d470c969a2a;hpb=95d529296ce772eeb1beb269e42d5fc63ba0bb50;p=8sync.git diff --git a/8sync/actors.scm b/8sync/actors.scm index 48070c4..a069a71 100644 --- a/8sync/actors.scm +++ b/8sync/actors.scm @@ -23,14 +23,18 @@ #:use-module (ice-9 format) #:use-module (ice-9 match) #:use-module (ice-9 atomic) + #:use-module ((ice-9 ports internal) + #:select (port-read-wait-fd port-write-wait-fd)) #:use-module (ice-9 pretty-print) #:use-module (ice-9 receive) + #:use-module (ice-9 suspendable-ports) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers conditions) #:use-module (fibers operations) #:use-module (8sync inbox) #:use-module (8sync rmeta-slot) + #:export (;; utilities... ought to go in their own module big-random-number big-random-number-string @@ -58,13 +62,14 @@ define-actor + actor-spawn-fiber + with-actor-nonblocking-ports + ;; ;; make-hive ;; ;; There are more methods for the hive, but there's ;; ;; no reason for the outside world to look at them maybe? ;; hive-id - bootstrap-actor bootstrap-actor* - create-actor create-actor* self-destruct @@ -76,7 +81,11 @@ <- <-wait - spawn-hive run-hive)) + spawn-hive run-hive + + ;; Maybe the wrong place for this, or for it to be exported. + ;; But it's used in websockets' server implementation at least... + live-wrap)) ;; For ids (set! *random-state* (random-state-from-platform)) @@ -189,7 +198,7 @@ (when (not prompt) (error "Tried to <-wait without being in an actor's context...")) - (let ((reply (abort-to-prompt prompt to action args))) + (let ((reply (abort-to-prompt prompt '<-wait to action args))) (cond ((eq? action '*error*) (throw 'hive-unresumable-coroutine "Won't resume coroutine; got an *error* as a reply" @@ -214,18 +223,18 @@ #:message message)) (apply method actor message (message-body message))) -(define-syntax-rule (wrap-apply body) +(define-syntax-rule (live-wrap body) "Wrap possibly multi-value function in a procedure, applies all arguments" (lambda args (apply body args))) (define-syntax-rule (build-actions (symbol method) ...) "Construct an alist of (symbol . method), where the method is wrapped -with wrap-apply to facilitate live hacking and allow the method definition +with `live-wrap' to facilitate live hacking and allow the method definition to come after class definition." (build-rmeta-slot (list (cons (quote symbol) - (wrap-apply method)) ...))) + (live-wrap method)) ...))) (define-class () ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?) @@ -234,9 +243,6 @@ to come after class definition." ;; kicks the bucket (id #:init-keyword #:address #:getter actor-id) - ;; The connection to the hive we're connected to. - (hive-channel #:init-keyword #:hive-channel - #:accessor actor-hive-channel) ;; Our queue to send/receive messages on (inbox-deq #:init-thunk make-channel @@ -333,6 +339,9 @@ dead? condition." (define *actor-prompt* (make-parameter #f)) +(define *resume-io-channel* + (make-parameter #f)) + (define (actor-main-loop actor) "Main loop of the actor. Loops around, pulling messages off its queue and handling them." @@ -344,6 +353,9 @@ and handling them." (define dead? (address-dead? (actor-id actor))) (define prompt (make-prompt-tag (actor-id-actor actor))) + ;; Not always used, only if with-actor-nonblocking-ports is used + (define resume-io-channel + (make-channel)) (define (handle-message message) (catch #t @@ -388,6 +400,21 @@ and handling them." "Tried to resume nonexistant message: ~a\n" (message-id message))))) + (define (call-with-actor-prompt thunk) + (call-with-prompt prompt + thunk + ;; Here's where we abort to if we're doing <-wait + ;; @@: maybe use match-lambda if we're going to end up + ;; handling multiple ~commands + (match-lambda* + ((kont '<-wait to action message-args) + (define message-id + ((actor-msg-id-generator actor))) + (hash-set! waiting message-id kont) + (%<- #t actor to action message-args message-id #f)) + ((kont 'run-me proc) + (proc kont))))) + (define halt-or-handle-message ;; It would be nice if we could give priorities to certain operations. ;; halt should always win over getting a message... @@ -396,28 +423,28 @@ and handling them." (const #f)) ; halt and return (wrap-operation (get-operation (actor-inbox-deq actor)) (lambda (message) - (call-with-prompt prompt - (lambda () - (if (message-in-reply-to message) - ;; resume a continuation which was waiting on a reply - (resume-handler message) - ;; start handling a new message - (handle-message message))) - ;; Here's where we abort to if we're doing <-wait - ;; @@: maybe use match-lambda if we're going to end up - ;; handling multiple ~commands - (lambda (kont to action message-args) - (define message-id - ((actor-msg-id-generator actor))) - (hash-set! waiting message-id kont) - (%<- #t actor to action message-args message-id #f))) - #t)))) ; loop again + (call-with-actor-prompt + (lambda () + (if (message-in-reply-to message) + ;; resume a continuation which was waiting on a reply + (resume-handler message) + ;; start handling a new message + (handle-message message)))) + #t)) ; loop again + (wrap-operation (get-operation resume-io-channel) + (lambda (thunk) + (call-with-actor-prompt + (lambda () + (thunk))) + #t)))) ;; Mutate the parameter; this should be fine since each fiber ;; runs in its own dynamic state with with-dynamic-state. ;; See with-dynamic-state discussion in ;; https://wingolog.org/archives/2017/06/27/growing-fibers (*current-actor* actor) + (*resume-io-channel* resume-io-channel) + ;; We temporarily set the *actor-prompt* to #f to make sure that ;; actor-init! doesn't try to do a <-wait message (and not accidentally use ;; the parent fiber's *actor-prompt* either.) @@ -429,6 +456,59 @@ and handling them." (and (perform-operation halt-or-handle-message) (loop)))) + +;; @@: So in order for this to work, we're going to have to add +;; another channel to actors, which is resumable i/o. We'll have to +;; spawn a fiber that wakes up a thunk on the actor when its port is +;; available. Funky... + +(define (%suspend-io-to-actor wait-for-read/write) + (lambda (port) + (define prompt (*actor-prompt*)) + (define resume-channel (*resume-io-channel*)) + (define (run-at-prompt k) + (spawn-fiber + (lambda () + (wait-for-read/write port) + ;; okay, we're awake again, tell the actor to resume this + ;; continuation + (put-message resume-channel k)) + #:parallel? #f)) + (when (not prompt) + (error "Attempt to abort to actor prompt outside of actor")) + (abort-to-prompt (*actor-prompt*) + 'run-me run-at-prompt))) + +(define suspend-read-to-actor + (%suspend-io-to-actor (@@ (fibers) wait-for-readable))) + +(define suspend-write-to-actor + (%suspend-io-to-actor (@@ (fibers) wait-for-writable))) + +(define (with-actor-nonblocking-ports thunk) + "Runs THUNK in dynamic context in which attempting to read/write +from a port that would otherwise block an actor's correspondence with +other actors (note that reading from a nonblocking port should never +block other fibers) will instead permit reading other messages while +I/O is waiting to complete. + +Note that currently " + (parameterize ((current-read-waiter suspend-read-to-actor) + (current-write-waiter suspend-write-to-actor)) + (thunk))) + +(define (actor-spawn-fiber thunk . args) + "Spawn a fiber from an actor but unset actor-machinery-specific +dynamic context." + (apply spawn-fiber + (lambda () + (*current-actor* #f) + (*resume-io-channel* #f) + (*actor-prompt* #f) + (thunk)) + args)) + + ;;; Actor utilities ;;; =============== @@ -447,6 +527,7 @@ and handling them." ;;; Every actor has a hive, which keeps track of other actors, manages ;;; cleanup, and performs inter-hive communication. +;; TODO: Make this a srfi-9 record type (define-class () (id #:init-keyword #:id #:getter hive-id) @@ -510,12 +591,17 @@ values, the first value being a symbol" (and (perform-operation halt-or-handle) (lp)))) -(define *current-hive* (make-parameter #f)) +(define *hive-id* (make-parameter #f)) +(define *hive-channel* (make-parameter #f)) +;; @@: Should we halt the hive either at the end of spawn-hive or run-hive? (define* (spawn-hive proc #:key (hive (make-hive))) - "Spawn a hive in a fiber running PROC, passing it the fresh hive" + "Spawn a hive and run PROC, passing it the fresh hive and establishing +a dynamic context surrounding the hive." (spawn-fiber (lambda () (hive-main-loop hive))) - (proc hive)) + (parameterize ((*hive-id* (hive-id hive)) + (*hive-channel* (hive-channel hive))) + (proc hive))) (define (run-hive proc . args) "Spawn a hive and run it in run-fibers. Takes a PROC as would be passed @@ -525,15 +611,15 @@ to spawn-hive... all remaining arguments passed to run-fibers." (spawn-hive proc)) args)) -(define (%create-actor hive-channel hive-id - actor-class init-args id-cookie send-init?) - (let* ((actor-id (gen-actor-id id-cookie)) +(define (%create-actor actor-class init-args id-cookie send-init?) + (let* ((hive-channel (*hive-channel*)) + (hive-id (*hive-id*)) + (actor-id (gen-actor-id id-cookie)) (dead? (make-condition)) (inbox-enq (make-channel)) (address (make-address actor-id hive-id inbox-enq dead?)) (actor (apply make actor-class - #:hive-channel hive-channel #:address address init-args)) (should-init (actor-should-init actor))) @@ -557,35 +643,20 @@ to spawn-hive... all remaining arguments passed to run-fibers." ;; return the address address)) -(define* (bootstrap-actor hive actor-class #:rest init-args) - "Create an actor on HIVE using ACTOR-CLASS passing in INIT-ARGS args" - (%create-actor (hive-channel hive) (hive-id hive) actor-class - init-args (symbol->string (class-name actor-class)) - #f)) - -(define* (bootstrap-actor* hive actor-class id-cookie #:rest init-args) - "Create an actor, but also allow customizing a 'cookie' added to the id -for debugging" - (%create-actor (hive-channel hive) (hive-id hive) actor-class - init-args id-cookie - #f)) - -(define* (create-actor from-actor actor-class #:rest init-args) +(define* (create-actor actor-class #:rest init-args) "Create an instance of actor-class. Return the new actor's id. This is the method actors should call directly (unless they want to supply an id-cookie, in which case they should use create-actor*)." - (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor) - actor-class init-args #f #t)) + (%create-actor actor-class init-args #f #t)) -(define* (create-actor* from-actor actor-class id-cookie #:rest init-args) +(define* (create-actor* actor-class id-cookie #:rest init-args) "Create an instance of actor-class. Return the new actor's id. Like create-actor, but permits supplying an id-cookie." - (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor) - actor-class init-args id-cookie #t)) + (%create-actor actor-class init-args id-cookie #t)) (define* (self-destruct actor #:key (cleanup #t)) "Remove an actor from the hive. @@ -593,7 +664,7 @@ Like create-actor, but permits supplying an id-cookie." Unless #:cleanup is set to #f, this will first have the actor handle its '*cleanup* action handler." (signal-condition! (address-dead? (actor-id actor))) - (put-message (actor-hive-channel actor) (list 'remove-actor (actor-id-actor actor))) + (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor))) ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending ;; a message with <-wait (*actor-prompt* #f)