X-Git-Url: https://jxself.org/git/?a=blobdiff_plain;f=8sync%2Factors.scm;h=a069a7182a4ae74052a62874cf85266424eec654;hb=0f7daa4787860cbbc739a51140fb8257b7fc4fef;hp=d12427e5ef056aefa64f14d9415729b01f3c6f63;hpb=1e636b5da1e6c25ba366b475f939cd4899770fef;p=8sync.git diff --git a/8sync/actors.scm b/8sync/actors.scm index d12427e..a069a71 100644 --- a/8sync/actors.scm +++ b/8sync/actors.scm @@ -32,7 +32,6 @@ #:use-module (fibers channels) #:use-module (fibers conditions) #:use-module (fibers operations) - #:use-module (fibers internal) #:use-module (8sync inbox) #:use-module (8sync rmeta-slot) @@ -71,8 +70,6 @@ ;; ;; There are more methods for the hive, but there's ;; ;; no reason for the outside world to look at them maybe? ;; hive-id - bootstrap-actor bootstrap-actor* - create-actor create-actor* self-destruct @@ -84,7 +81,11 @@ <- <-wait - spawn-hive run-hive)) + spawn-hive run-hive + + ;; Maybe the wrong place for this, or for it to be exported. + ;; But it's used in websockets' server implementation at least... + live-wrap)) ;; For ids (set! *random-state* (random-state-from-platform)) @@ -222,18 +223,18 @@ #:message message)) (apply method actor message (message-body message))) -(define-syntax-rule (wrap-apply body) +(define-syntax-rule (live-wrap body) "Wrap possibly multi-value function in a procedure, applies all arguments" (lambda args (apply body args))) (define-syntax-rule (build-actions (symbol method) ...) "Construct an alist of (symbol . method), where the method is wrapped -with wrap-apply to facilitate live hacking and allow the method definition +with `live-wrap' to facilitate live hacking and allow the method definition to come after class definition." (build-rmeta-slot (list (cons (quote symbol) - (wrap-apply method)) ...))) + (live-wrap method)) ...))) (define-class () ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?) @@ -242,9 +243,6 @@ to come after class definition." ;; kicks the bucket (id #:init-keyword #:address #:getter actor-id) - ;; The connection to the hive we're connected to. - (hive-channel #:init-keyword #:hive-channel - #:accessor actor-hive-channel) ;; Our queue to send/receive messages on (inbox-deq #:init-thunk make-channel @@ -402,6 +400,21 @@ and handling them." "Tried to resume nonexistant message: ~a\n" (message-id message))))) + (define (call-with-actor-prompt thunk) + (call-with-prompt prompt + thunk + ;; Here's where we abort to if we're doing <-wait + ;; @@: maybe use match-lambda if we're going to end up + ;; handling multiple ~commands + (match-lambda* + ((kont '<-wait to action message-args) + (define message-id + ((actor-msg-id-generator actor))) + (hash-set! waiting message-id kont) + (%<- #t actor to action message-args message-id #f)) + ((kont 'run-me proc) + (proc kont))))) + (define halt-or-handle-message ;; It would be nice if we could give priorities to certain operations. ;; halt should always win over getting a message... @@ -410,29 +423,20 @@ and handling them." (const #f)) ; halt and return (wrap-operation (get-operation (actor-inbox-deq actor)) (lambda (message) - (call-with-prompt prompt - (lambda () - (if (message-in-reply-to message) - ;; resume a continuation which was waiting on a reply - (resume-handler message) - ;; start handling a new message - (handle-message message))) - ;; Here's where we abort to if we're doing <-wait - ;; @@: maybe use match-lambda if we're going to end up - ;; handling multiple ~commands - (match-lambda* - ((kont '<-wait to action message-args) - (define message-id - ((actor-msg-id-generator actor))) - (hash-set! waiting message-id kont) - (%<- #t actor to action message-args message-id #f)) - ((kont 'run-me proc) - (proc kont)))) + (call-with-actor-prompt + (lambda () + (if (message-in-reply-to message) + ;; resume a continuation which was waiting on a reply + (resume-handler message) + ;; start handling a new message + (handle-message message)))) #t)) ; loop again (wrap-operation (get-operation resume-io-channel) (lambda (thunk) - (thunk - #t))))) + (call-with-actor-prompt + (lambda () + (thunk))) + #t)))) ;; Mutate the parameter; this should be fine since each fiber ;; runs in its own dynamic state with with-dynamic-state. @@ -458,16 +462,14 @@ and handling them." ;; spawn a fiber that wakes up a thunk on the actor when its port is ;; available. Funky... -(define (%suspend-io-to-actor resume-method get-wait-fd-method) +(define (%suspend-io-to-actor wait-for-read/write) (lambda (port) (define prompt (*actor-prompt*)) (define resume-channel (*resume-io-channel*)) (define (run-at-prompt k) (spawn-fiber (lambda () - (suspend-current-fiber - (lambda (fiber) - (resume-on-readable-fd (port-read-wait-fd port) fiber))) + (wait-for-read/write port) ;; okay, we're awake again, tell the actor to resume this ;; continuation (put-message resume-channel k)) @@ -478,10 +480,10 @@ and handling them." 'run-me run-at-prompt))) (define suspend-read-to-actor - (%suspend-io-to-actor resume-on-readable-fd port-read-wait-fd)) + (%suspend-io-to-actor (@@ (fibers) wait-for-readable))) (define suspend-write-to-actor - (%suspend-io-to-actor resume-on-writable-fd port-write-wait-fd)) + (%suspend-io-to-actor (@@ (fibers) wait-for-writable))) (define (with-actor-nonblocking-ports thunk) "Runs THUNK in dynamic context in which attempting to read/write @@ -525,6 +527,7 @@ dynamic context." ;;; Every actor has a hive, which keeps track of other actors, manages ;;; cleanup, and performs inter-hive communication. +;; TODO: Make this a srfi-9 record type (define-class () (id #:init-keyword #:id #:getter hive-id) @@ -588,12 +591,17 @@ values, the first value being a symbol" (and (perform-operation halt-or-handle) (lp)))) -(define *current-hive* (make-parameter #f)) +(define *hive-id* (make-parameter #f)) +(define *hive-channel* (make-parameter #f)) +;; @@: Should we halt the hive either at the end of spawn-hive or run-hive? (define* (spawn-hive proc #:key (hive (make-hive))) - "Spawn a hive in a fiber running PROC, passing it the fresh hive" + "Spawn a hive and run PROC, passing it the fresh hive and establishing +a dynamic context surrounding the hive." (spawn-fiber (lambda () (hive-main-loop hive))) - (proc hive)) + (parameterize ((*hive-id* (hive-id hive)) + (*hive-channel* (hive-channel hive))) + (proc hive))) (define (run-hive proc . args) "Spawn a hive and run it in run-fibers. Takes a PROC as would be passed @@ -603,15 +611,15 @@ to spawn-hive... all remaining arguments passed to run-fibers." (spawn-hive proc)) args)) -(define (%create-actor hive-channel hive-id - actor-class init-args id-cookie send-init?) - (let* ((actor-id (gen-actor-id id-cookie)) +(define (%create-actor actor-class init-args id-cookie send-init?) + (let* ((hive-channel (*hive-channel*)) + (hive-id (*hive-id*)) + (actor-id (gen-actor-id id-cookie)) (dead? (make-condition)) (inbox-enq (make-channel)) (address (make-address actor-id hive-id inbox-enq dead?)) (actor (apply make actor-class - #:hive-channel hive-channel #:address address init-args)) (should-init (actor-should-init actor))) @@ -635,35 +643,20 @@ to spawn-hive... all remaining arguments passed to run-fibers." ;; return the address address)) -(define* (bootstrap-actor hive actor-class #:rest init-args) - "Create an actor on HIVE using ACTOR-CLASS passing in INIT-ARGS args" - (%create-actor (hive-channel hive) (hive-id hive) actor-class - init-args (symbol->string (class-name actor-class)) - #f)) - -(define* (bootstrap-actor* hive actor-class id-cookie #:rest init-args) - "Create an actor, but also allow customizing a 'cookie' added to the id -for debugging" - (%create-actor (hive-channel hive) (hive-id hive) actor-class - init-args id-cookie - #f)) - -(define* (create-actor from-actor actor-class #:rest init-args) +(define* (create-actor actor-class #:rest init-args) "Create an instance of actor-class. Return the new actor's id. This is the method actors should call directly (unless they want to supply an id-cookie, in which case they should use create-actor*)." - (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor) - actor-class init-args #f #t)) + (%create-actor actor-class init-args #f #t)) -(define* (create-actor* from-actor actor-class id-cookie #:rest init-args) +(define* (create-actor* actor-class id-cookie #:rest init-args) "Create an instance of actor-class. Return the new actor's id. Like create-actor, but permits supplying an id-cookie." - (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor) - actor-class init-args id-cookie #t)) + (%create-actor actor-class init-args id-cookie #t)) (define* (self-destruct actor #:key (cleanup #t)) "Remove an actor from the hive. @@ -671,7 +664,7 @@ Like create-actor, but permits supplying an id-cookie." Unless #:cleanup is set to #f, this will first have the actor handle its '*cleanup* action handler." (signal-condition! (address-dead? (actor-id actor))) - (put-message (actor-hive-channel actor) (list 'remove-actor (actor-id-actor actor))) + (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor))) ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending ;; a message with <-wait (*actor-prompt* #f)