From 1e636b5da1e6c25ba366b475f939cd4899770fef Mon Sep 17 00:00:00 2001 From: Christopher Allan Webber Date: Mon, 31 Jul 2017 16:14:16 -0500 Subject: [PATCH] Add with-actor-nonblocking-ports and related tooling. This permits actors to not block their handling of correspondence on I/O by handling the suspendable-ports functionality themselves rather than through the normal fibers route. This is useful for actors that might be needing to exclusively control a port * 8sync/actors.scm (<-wait): Add '<-wait symbol to arguments of aborting to the prompt to distinguish it from the 'run-me action. (actor-main-loop): Support handling suspendable ports on actor level to support correspondence during I/O. (%suspend-io-to-actor, suspend-read-to-actor, suspend-write-to-actor) (with-actor-nonblocking-ports, actor-spawn-fiber) (*resume-io-channel*): New variables. --- 8sync/actors.scm | 92 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 85 insertions(+), 7 deletions(-) diff --git a/8sync/actors.scm b/8sync/actors.scm index 48070c4..d12427e 100644 --- a/8sync/actors.scm +++ b/8sync/actors.scm @@ -23,14 +23,19 @@ #:use-module (ice-9 format) #:use-module (ice-9 match) #:use-module (ice-9 atomic) + #:use-module ((ice-9 ports internal) + #:select (port-read-wait-fd port-write-wait-fd)) #:use-module (ice-9 pretty-print) #:use-module (ice-9 receive) + #:use-module (ice-9 suspendable-ports) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers conditions) #:use-module (fibers operations) + #:use-module (fibers internal) #:use-module (8sync inbox) #:use-module (8sync rmeta-slot) + #:export (;; utilities... ought to go in their own module big-random-number big-random-number-string @@ -58,6 +63,9 @@ define-actor + actor-spawn-fiber + with-actor-nonblocking-ports + ;; ;; make-hive ;; ;; There are more methods for the hive, but there's @@ -189,7 +197,7 @@ (when (not prompt) (error "Tried to <-wait without being in an actor's context...")) - (let ((reply (abort-to-prompt prompt to action args))) + (let ((reply (abort-to-prompt prompt '<-wait to action args))) (cond ((eq? action '*error*) (throw 'hive-unresumable-coroutine "Won't resume coroutine; got an *error* as a reply" @@ -333,6 +341,9 @@ dead? condition." (define *actor-prompt* (make-parameter #f)) +(define *resume-io-channel* + (make-parameter #f)) + (define (actor-main-loop actor) "Main loop of the actor. Loops around, pulling messages off its queue and handling them." @@ -344,6 +355,9 @@ and handling them." (define dead? (address-dead? (actor-id actor))) (define prompt (make-prompt-tag (actor-id-actor actor))) + ;; Not always used, only if with-actor-nonblocking-ports is used + (define resume-io-channel + (make-channel)) (define (handle-message message) (catch #t @@ -406,18 +420,27 @@ and handling them." ;; Here's where we abort to if we're doing <-wait ;; @@: maybe use match-lambda if we're going to end up ;; handling multiple ~commands - (lambda (kont to action message-args) - (define message-id - ((actor-msg-id-generator actor))) - (hash-set! waiting message-id kont) - (%<- #t actor to action message-args message-id #f))) - #t)))) ; loop again + (match-lambda* + ((kont '<-wait to action message-args) + (define message-id + ((actor-msg-id-generator actor))) + (hash-set! waiting message-id kont) + (%<- #t actor to action message-args message-id #f)) + ((kont 'run-me proc) + (proc kont)))) + #t)) ; loop again + (wrap-operation (get-operation resume-io-channel) + (lambda (thunk) + (thunk + #t))))) ;; Mutate the parameter; this should be fine since each fiber ;; runs in its own dynamic state with with-dynamic-state. ;; See with-dynamic-state discussion in ;; https://wingolog.org/archives/2017/06/27/growing-fibers (*current-actor* actor) + (*resume-io-channel* resume-io-channel) + ;; We temporarily set the *actor-prompt* to #f to make sure that ;; actor-init! doesn't try to do a <-wait message (and not accidentally use ;; the parent fiber's *actor-prompt* either.) @@ -429,6 +452,61 @@ and handling them." (and (perform-operation halt-or-handle-message) (loop)))) + +;; @@: So in order for this to work, we're going to have to add +;; another channel to actors, which is resumable i/o. We'll have to +;; spawn a fiber that wakes up a thunk on the actor when its port is +;; available. Funky... + +(define (%suspend-io-to-actor resume-method get-wait-fd-method) + (lambda (port) + (define prompt (*actor-prompt*)) + (define resume-channel (*resume-io-channel*)) + (define (run-at-prompt k) + (spawn-fiber + (lambda () + (suspend-current-fiber + (lambda (fiber) + (resume-on-readable-fd (port-read-wait-fd port) fiber))) + ;; okay, we're awake again, tell the actor to resume this + ;; continuation + (put-message resume-channel k)) + #:parallel? #f)) + (when (not prompt) + (error "Attempt to abort to actor prompt outside of actor")) + (abort-to-prompt (*actor-prompt*) + 'run-me run-at-prompt))) + +(define suspend-read-to-actor + (%suspend-io-to-actor resume-on-readable-fd port-read-wait-fd)) + +(define suspend-write-to-actor + (%suspend-io-to-actor resume-on-writable-fd port-write-wait-fd)) + +(define (with-actor-nonblocking-ports thunk) + "Runs THUNK in dynamic context in which attempting to read/write +from a port that would otherwise block an actor's correspondence with +other actors (note that reading from a nonblocking port should never +block other fibers) will instead permit reading other messages while +I/O is waiting to complete. + +Note that currently " + (parameterize ((current-read-waiter suspend-read-to-actor) + (current-write-waiter suspend-write-to-actor)) + (thunk))) + +(define (actor-spawn-fiber thunk . args) + "Spawn a fiber from an actor but unset actor-machinery-specific +dynamic context." + (apply spawn-fiber + (lambda () + (*current-actor* #f) + (*resume-io-channel* #f) + (*actor-prompt* #f) + (thunk)) + args)) + + ;;; Actor utilities ;;; =============== -- 2.31.1