X-Git-Url: https://jxself.org/git/?a=blobdiff_plain;f=8sync%2Factors.scm;h=a069a7182a4ae74052a62874cf85266424eec654;hb=0f7daa4787860cbbc739a51140fb8257b7fc4fef;hp=ca8a55f86ec2a90649b246fd544b12b194f2492d;hpb=1253ea73ba4020df78671955721721179b556bfc;p=8sync.git diff --git a/8sync/actors.scm b/8sync/actors.scm index ca8a55f..a069a71 100644 --- a/8sync/actors.scm +++ b/8sync/actors.scm @@ -32,7 +32,6 @@ #:use-module (fibers channels) #:use-module (fibers conditions) #:use-module (fibers operations) - #:use-module (fibers internal) #:use-module (8sync inbox) #:use-module (8sync rmeta-slot) @@ -82,7 +81,11 @@ <- <-wait - spawn-hive run-hive)) + spawn-hive run-hive + + ;; Maybe the wrong place for this, or for it to be exported. + ;; But it's used in websockets' server implementation at least... + live-wrap)) ;; For ids (set! *random-state* (random-state-from-platform)) @@ -220,18 +223,18 @@ #:message message)) (apply method actor message (message-body message))) -(define-syntax-rule (wrap-apply body) +(define-syntax-rule (live-wrap body) "Wrap possibly multi-value function in a procedure, applies all arguments" (lambda args (apply body args))) (define-syntax-rule (build-actions (symbol method) ...) "Construct an alist of (symbol . method), where the method is wrapped -with wrap-apply to facilitate live hacking and allow the method definition +with `live-wrap' to facilitate live hacking and allow the method definition to come after class definition." (build-rmeta-slot (list (cons (quote symbol) - (wrap-apply method)) ...))) + (live-wrap method)) ...))) (define-class () ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?) @@ -397,6 +400,21 @@ and handling them." "Tried to resume nonexistant message: ~a\n" (message-id message))))) + (define (call-with-actor-prompt thunk) + (call-with-prompt prompt + thunk + ;; Here's where we abort to if we're doing <-wait + ;; @@: maybe use match-lambda if we're going to end up + ;; handling multiple ~commands + (match-lambda* + ((kont '<-wait to action message-args) + (define message-id + ((actor-msg-id-generator actor))) + (hash-set! waiting message-id kont) + (%<- #t actor to action message-args message-id #f)) + ((kont 'run-me proc) + (proc kont))))) + (define halt-or-handle-message ;; It would be nice if we could give priorities to certain operations. ;; halt should always win over getting a message... @@ -405,29 +423,20 @@ and handling them." (const #f)) ; halt and return (wrap-operation (get-operation (actor-inbox-deq actor)) (lambda (message) - (call-with-prompt prompt - (lambda () - (if (message-in-reply-to message) - ;; resume a continuation which was waiting on a reply - (resume-handler message) - ;; start handling a new message - (handle-message message))) - ;; Here's where we abort to if we're doing <-wait - ;; @@: maybe use match-lambda if we're going to end up - ;; handling multiple ~commands - (match-lambda* - ((kont '<-wait to action message-args) - (define message-id - ((actor-msg-id-generator actor))) - (hash-set! waiting message-id kont) - (%<- #t actor to action message-args message-id #f)) - ((kont 'run-me proc) - (proc kont)))) + (call-with-actor-prompt + (lambda () + (if (message-in-reply-to message) + ;; resume a continuation which was waiting on a reply + (resume-handler message) + ;; start handling a new message + (handle-message message)))) #t)) ; loop again (wrap-operation (get-operation resume-io-channel) (lambda (thunk) - (thunk - #t))))) + (call-with-actor-prompt + (lambda () + (thunk))) + #t)))) ;; Mutate the parameter; this should be fine since each fiber ;; runs in its own dynamic state with with-dynamic-state. @@ -453,16 +462,14 @@ and handling them." ;; spawn a fiber that wakes up a thunk on the actor when its port is ;; available. Funky... -(define (%suspend-io-to-actor resume-method get-wait-fd-method) +(define (%suspend-io-to-actor wait-for-read/write) (lambda (port) (define prompt (*actor-prompt*)) (define resume-channel (*resume-io-channel*)) (define (run-at-prompt k) (spawn-fiber (lambda () - (suspend-current-fiber - (lambda (fiber) - (resume-on-readable-fd (port-read-wait-fd port) fiber))) + (wait-for-read/write port) ;; okay, we're awake again, tell the actor to resume this ;; continuation (put-message resume-channel k)) @@ -473,10 +480,10 @@ and handling them." 'run-me run-at-prompt))) (define suspend-read-to-actor - (%suspend-io-to-actor resume-on-readable-fd port-read-wait-fd)) + (%suspend-io-to-actor (@@ (fibers) wait-for-readable))) (define suspend-write-to-actor - (%suspend-io-to-actor resume-on-writable-fd port-write-wait-fd)) + (%suspend-io-to-actor (@@ (fibers) wait-for-writable))) (define (with-actor-nonblocking-ports thunk) "Runs THUNK in dynamic context in which attempting to read/write