X-Git-Url: https://jxself.org/git/?a=blobdiff_plain;ds=sidebyside;f=8sync%2Factors.scm;h=8abe41a1133b20c65dcaf8a3ff3ca1c5f40a8c3e;hb=e1cc354c7073230398634b1638ea288ce3e27852;hp=ca8a55f86ec2a90649b246fd544b12b194f2492d;hpb=1253ea73ba4020df78671955721721179b556bfc;p=8sync.git diff --git a/8sync/actors.scm b/8sync/actors.scm index ca8a55f..8abe41a 100644 --- a/8sync/actors.scm +++ b/8sync/actors.scm @@ -32,7 +32,6 @@ #:use-module (fibers channels) #:use-module (fibers conditions) #:use-module (fibers operations) - #:use-module (fibers internal) #:use-module (8sync inbox) #:use-module (8sync rmeta-slot) @@ -44,6 +43,8 @@ actor-id actor-message-handler + *current-actor* + ;;; Commenting out the
type for now; ;;; it may be back when we have better serializers ;;
@@ -57,8 +58,6 @@ actor-init! actor-cleanup! - actor-alive? - build-actions define-actor @@ -82,7 +81,11 @@ <- <-wait - spawn-hive run-hive)) + spawn-hive run-hive + + ;; Maybe the wrong place for this, or for it to be exported. + ;; But it's used in websockets' server implementation at least... + live-wrap)) ;; For ids (set! *random-state* (random-state-from-platform)) @@ -153,7 +156,7 @@ ;; This is the internal, generalized message sending method. ;; Users shouldn't use it! Use the <-foo forms instead. -(define-inlinable (%<- wants-reply from-actor to action args message-id in-reply-to) +(define (%<- wants-reply from-actor to action args message-id in-reply-to) ;; Okay, we need to deal with message ids. ;; Could we get rid of them? :\ ;; It seems if we can use eq? and have messages be immutable then @@ -177,7 +180,13 @@ 'TODO) ;; A message sent to nobody goes nowhere. ;; TODO: Should we display a warning here, probably? - (#f #f))) + (#f #f) + ;; We shouldn't technically be passing in actors but rather their + ;; addresses, but often actors want to message themselves and + ;; this makes that slightly easier. + ((? (lambda (x) (is-a? x )) actor) + (%<- wants-reply from-actor (actor-id actor) action + args message-id in-reply-to)))) (define (<- to action . args) (define from-actor (*current-actor*)) @@ -220,18 +229,18 @@ #:message message)) (apply method actor message (message-body message))) -(define-syntax-rule (wrap-apply body) +(define-syntax-rule (live-wrap body) "Wrap possibly multi-value function in a procedure, applies all arguments" (lambda args (apply body args))) (define-syntax-rule (build-actions (symbol method) ...) "Construct an alist of (symbol . method), where the method is wrapped -with wrap-apply to facilitate live hacking and allow the method definition +with `live-wrap' to facilitate live hacking and allow the method definition to come after class definition." (build-rmeta-slot (list (cons (quote symbol) - (wrap-apply method)) ...))) + (live-wrap method)) ...))) (define-class () ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?) @@ -363,10 +372,9 @@ and handling them." (lambda vals ;; Return reply if necessary (when (message-wants-reply message) - (when (message-wants-reply message) - (%<- #f actor (message-from message) '*reply* - vals ((actor-msg-id-generator actor)) - (message-id message))))))) + (%<- #f actor (message-from message) '*reply* + vals ((actor-msg-id-generator actor)) + (message-id message)))))) (const #t) (let ((err (current-error-port))) (lambda (key . args) @@ -397,6 +405,21 @@ and handling them." "Tried to resume nonexistant message: ~a\n" (message-id message))))) + (define (call-with-actor-prompt thunk) + (call-with-prompt prompt + thunk + ;; Here's where we abort to if we're doing <-wait + ;; @@: maybe use match-lambda if we're going to end up + ;; handling multiple ~commands + (match-lambda* + ((kont '<-wait to action message-args) + (define message-id + ((actor-msg-id-generator actor))) + (hash-set! waiting message-id kont) + (%<- #t actor to action message-args message-id #f)) + ((kont 'run-me proc) + (proc kont))))) + (define halt-or-handle-message ;; It would be nice if we could give priorities to certain operations. ;; halt should always win over getting a message... @@ -405,29 +428,20 @@ and handling them." (const #f)) ; halt and return (wrap-operation (get-operation (actor-inbox-deq actor)) (lambda (message) - (call-with-prompt prompt - (lambda () - (if (message-in-reply-to message) - ;; resume a continuation which was waiting on a reply - (resume-handler message) - ;; start handling a new message - (handle-message message))) - ;; Here's where we abort to if we're doing <-wait - ;; @@: maybe use match-lambda if we're going to end up - ;; handling multiple ~commands - (match-lambda* - ((kont '<-wait to action message-args) - (define message-id - ((actor-msg-id-generator actor))) - (hash-set! waiting message-id kont) - (%<- #t actor to action message-args message-id #f)) - ((kont 'run-me proc) - (proc kont)))) + (call-with-actor-prompt + (lambda () + (if (message-in-reply-to message) + ;; resume a continuation which was waiting on a reply + (resume-handler message) + ;; start handling a new message + (handle-message message)))) #t)) ; loop again (wrap-operation (get-operation resume-io-channel) (lambda (thunk) - (thunk - #t))))) + (call-with-actor-prompt + (lambda () + (thunk))) + #t)))) ;; Mutate the parameter; this should be fine since each fiber ;; runs in its own dynamic state with with-dynamic-state. @@ -453,16 +467,14 @@ and handling them." ;; spawn a fiber that wakes up a thunk on the actor when its port is ;; available. Funky... -(define (%suspend-io-to-actor resume-method get-wait-fd-method) +(define (%suspend-io-to-actor wait-for-read/write) (lambda (port) (define prompt (*actor-prompt*)) (define resume-channel (*resume-io-channel*)) (define (run-at-prompt k) (spawn-fiber (lambda () - (suspend-current-fiber - (lambda (fiber) - (resume-on-readable-fd (port-read-wait-fd port) fiber))) + (wait-for-read/write port) ;; okay, we're awake again, tell the actor to resume this ;; continuation (put-message resume-channel k)) @@ -473,10 +485,10 @@ and handling them." 'run-me run-at-prompt))) (define suspend-read-to-actor - (%suspend-io-to-actor resume-on-readable-fd port-read-wait-fd)) + (%suspend-io-to-actor (@@ (fibers) wait-for-readable))) (define suspend-write-to-actor - (%suspend-io-to-actor resume-on-writable-fd port-write-wait-fd)) + (%suspend-io-to-actor (@@ (fibers) wait-for-writable))) (define (with-actor-nonblocking-ports thunk) "Runs THUNK in dynamic context in which attempting to read/write @@ -663,14 +675,3 @@ its '*cleanup* action handler." (*actor-prompt* #f) (actor-cleanup! actor)) -;; From a patch I sent to Fibers... -(define (condition-signalled? cvar) - "Return @code{#t} if @var{cvar} has already been signalled. - -In general you will want to use @code{wait} or @code{wait-operation} to -wait on a condition. However, sometimes it is useful to see whether or -not a condition has already been signalled without blocking." - (atomic-box-ref ((@@ (fibers conditions) condition-signalled?) cvar))) - -(define (actor-alive? actor) - (condition-signalled? (address-dead? (actor-id actor))))