Add with-actor-nonblocking-ports and related tooling.
authorChristopher Allan Webber <cwebber@dustycloud.org>
Mon, 31 Jul 2017 21:14:16 +0000 (16:14 -0500)
committerChristopher Allan Webber <cwebber@dustycloud.org>
Thu, 3 Aug 2017 20:50:18 +0000 (15:50 -0500)
This permits actors to not block their handling of correspondence on
I/O by handling the suspendable-ports functionality themselves rather
than through the normal fibers route.  This is useful for actors that
might be needing to exclusively control a port

* 8sync/actors.scm (<-wait): Add '<-wait symbol to arguments of
aborting to the prompt to distinguish it from the 'run-me action.
(actor-main-loop): Support handling suspendable ports on actor level
to support correspondence during I/O.
(%suspend-io-to-actor, suspend-read-to-actor, suspend-write-to-actor)
(with-actor-nonblocking-ports, actor-spawn-fiber)
(*resume-io-channel*): New variables.

8sync/actors.scm

index 48070c43f10deaeebea1db5c18c23d470c969a2a..d12427e5ef056aefa64f14d9415729b01f3c6f63 100644 (file)
   #:use-module (ice-9 format)
   #:use-module (ice-9 match)
   #:use-module (ice-9 atomic)
+  #:use-module ((ice-9 ports internal)
+                #:select (port-read-wait-fd port-write-wait-fd))
   #:use-module (ice-9 pretty-print)
   #:use-module (ice-9 receive)
+  #:use-module (ice-9 suspendable-ports)
   #:use-module (fibers)
   #:use-module (fibers channels)
   #:use-module (fibers conditions)
   #:use-module (fibers operations)
+  #:use-module (fibers internal)
   #:use-module (8sync inbox)
   #:use-module (8sync rmeta-slot)
+
   #:export (;; utilities... ought to go in their own module
             big-random-number
             big-random-number-string
@@ -58,6 +63,9 @@
 
             define-actor
 
+            actor-spawn-fiber
+            with-actor-nonblocking-ports
+
             ;; <hive>
             ;; make-hive
             ;; ;; There are more methods for the hive, but there's
   (when (not prompt)
     (error "Tried to <-wait without being in an actor's context..."))
 
-  (let ((reply (abort-to-prompt prompt to action args)))
+  (let ((reply (abort-to-prompt prompt '<-wait to action args)))
     (cond ((eq? action '*error*)
            (throw 'hive-unresumable-coroutine
                   "Won't resume coroutine; got an *error* as a reply"
@@ -333,6 +341,9 @@ dead? condition."
 (define *actor-prompt*
   (make-parameter #f))
 
+(define *resume-io-channel*
+  (make-parameter #f))
+
 (define (actor-main-loop actor)
   "Main loop of the actor.  Loops around, pulling messages off its queue
 and handling them."
@@ -344,6 +355,9 @@ and handling them."
   (define dead?
     (address-dead? (actor-id actor)))
   (define prompt (make-prompt-tag (actor-id-actor actor)))
+  ;; Not always used, only if with-actor-nonblocking-ports is used
+  (define resume-io-channel
+    (make-channel))
 
   (define (handle-message message)
     (catch #t
@@ -406,18 +420,27 @@ and handling them."
                          ;; Here's where we abort to if we're doing <-wait
                          ;; @@: maybe use match-lambda if we're going to end up
                          ;;   handling multiple ~commands
-                         (lambda (kont to action message-args)
-                           (define message-id
-                             ((actor-msg-id-generator actor)))
-                           (hash-set! waiting message-id kont)
-                           (%<- #t actor to action message-args message-id #f)))
-                       #t))))   ; loop again
+                         (match-lambda*
+                           ((kont '<-wait to action message-args)
+                            (define message-id
+                              ((actor-msg-id-generator actor)))
+                            (hash-set! waiting message-id kont)
+                            (%<- #t actor to action message-args message-id #f))
+                           ((kont 'run-me proc)
+                            (proc kont))))
+                       #t))   ; loop again
+     (wrap-operation (get-operation resume-io-channel)
+                     (lambda (thunk)
+                       (thunk
+                        #t)))))
 
   ;; Mutate the parameter; this should be fine since each fiber
   ;; runs in its own dynamic state with with-dynamic-state.
   ;; See with-dynamic-state discussion in
   ;;   https://wingolog.org/archives/2017/06/27/growing-fibers
   (*current-actor* actor)
+  (*resume-io-channel* resume-io-channel)
+
   ;; We temporarily set the *actor-prompt* to #f to make sure that
   ;; actor-init! doesn't try to do a <-wait message (and not accidentally use
   ;; the parent fiber's *actor-prompt* either.)
@@ -429,6 +452,61 @@ and handling them."
     (and (perform-operation halt-or-handle-message)
          (loop))))
 
+
+;; @@: So in order for this to work, we're going to have to add
+;; another channel to actors, which is resumable i/o.  We'll have to
+;; spawn a fiber that wakes up a thunk on the actor when its port is
+;; available.  Funky...
+
+(define (%suspend-io-to-actor resume-method get-wait-fd-method)
+  (lambda (port)
+    (define prompt (*actor-prompt*))
+    (define resume-channel (*resume-io-channel*))
+    (define (run-at-prompt k)
+      (spawn-fiber
+       (lambda ()
+         (suspend-current-fiber
+          (lambda (fiber)
+            (resume-on-readable-fd (port-read-wait-fd port) fiber)))
+         ;; okay, we're awake again, tell the actor to resume this
+         ;; continuation
+         (put-message resume-channel k))
+       #:parallel? #f))
+    (when (not prompt)
+      (error "Attempt to abort to actor prompt outside of actor"))
+    (abort-to-prompt (*actor-prompt*)
+                     'run-me run-at-prompt)))
+
+(define suspend-read-to-actor
+  (%suspend-io-to-actor resume-on-readable-fd port-read-wait-fd))
+
+(define suspend-write-to-actor
+  (%suspend-io-to-actor resume-on-writable-fd port-write-wait-fd))
+
+(define (with-actor-nonblocking-ports thunk)
+  "Runs THUNK in dynamic context in which attempting to read/write
+from a port that would otherwise block an actor's correspondence with
+other actors (note that reading from a nonblocking port should never
+block other fibers) will instead permit reading other messages while
+I/O is waiting to complete.
+
+Note that currently "
+  (parameterize ((current-read-waiter suspend-read-to-actor)
+                 (current-write-waiter suspend-write-to-actor))
+    (thunk)))
+
+(define (actor-spawn-fiber thunk . args)
+  "Spawn a fiber from an actor but unset actor-machinery-specific
+dynamic context."
+  (apply spawn-fiber
+         (lambda ()
+           (*current-actor* #f)
+           (*resume-io-channel* #f)
+           (*actor-prompt* #f)
+           (thunk))
+         args))
+
+
 \f
 ;;; Actor utilities
 ;;; ===============