actors: Reflect removal of choice of whether to cleanup in self-destruct
[8sync.git] / 8sync / actors.scm
index 9e249cdd987db42fe3c721d9b886b516ab08c33f..8f1538cd5c6b48c7374ec89a36b050079f038a72 100644 (file)
 (define-module (8sync actors)
   #:use-module (oop goops)
   #:use-module (srfi srfi-9)
+  #:use-module (srfi srfi-9 gnu)
   #:use-module (ice-9 control)
   #:use-module (ice-9 format)
   #:use-module (ice-9 match)
   #:use-module (ice-9 atomic)
+  #:use-module ((ice-9 ports internal)
+                #:select (port-read-wait-fd port-write-wait-fd))
   #:use-module (ice-9 pretty-print)
   #:use-module (ice-9 receive)
+  #:use-module (ice-9 suspendable-ports)
   #:use-module (fibers)
   #:use-module (fibers channels)
   #:use-module (fibers conditions)
   #:use-module (fibers operations)
-  #:use-module (8sync agenda)
   #:use-module (8sync inbox)
   #:use-module (8sync rmeta-slot)
+
   #:export (;; utilities... ought to go in their own module
             big-random-number
             big-random-number-string
             actor-id
             actor-message-handler
 
+            *current-actor*
+
             ;;; Commenting out the <address> type for now;
             ;;; it may be back when we have better serializers
             ;; <address>
-            make-address address?
+            make-address
             address-actor-id address-hive-id
 
-            actor-init! actor-cleanup!
-
             address->string
             actor-id-actor
             actor-id-hive
             actor-id-string
 
-            actor-alive?
+            actor-init! actor-cleanup!
 
             build-actions
 
             define-actor
 
+            actor-spawn-fiber
+            with-actor-nonblocking-ports
+
             ;; <hive>
             ;; make-hive
             ;; ;; There are more methods for the hive, but there's
             ;; ;; no reason for the outside world to look at them maybe?
             ;; hive-id
-            bootstrap-actor bootstrap-actor*
-
             create-actor create-actor*
             self-destruct
 
             message-id message-body message-in-reply-to
             message-wants-reply
 
-            message-auto-reply?
-
             <- <-wait
 
-            spawn-hive run-hive))
+            spawn-hive run-hive
+
+            ;; Maybe the wrong place for this, or for it to be exported.
+            ;; But it's used in websockets' server implementation at least...
+            live-wrap
+
+            *debug-actor-ids*))
 
 ;; For ids
 (set! *random-state* (random-state-from-platform))
 ;; This is the internal, generalized message sending method.
 ;; Users shouldn't use it!  Use the <-foo forms instead.
 
-(define-inlinable (%<- wants-reply from-actor to action args message-id in-reply-to)
+(define (%<- wants-reply from-actor to action args message-id in-reply-to)
   ;; Okay, we need to deal with message ids.
   ;; Could we get rid of them? :\
   ;; It seems if we can use eq? and have messages be immutable then
   ;; If we need to track replies across hive boundaries we could
   ;; register unique ids across the ambassador barrier.
   (match to
-    (#(_ _ (? channel? channel) dead?)
+    (($ <address> _ _ (? channel? channel) dead?)
      (let ((message (make-message message-id to
                                   (and from-actor (actor-id from-actor))
                                   action args
          (put-operation channel message)
          (wait-operation dead?)))))
     ;; TODO: put remote addresses here.
-    (#(actor-id hive-id #f #f)
+    (($ <address> actor-id hive-id #f #f)
      ;; Here we'd make a call to our hive...
      'TODO)
     ;; A message sent to nobody goes nowhere.
     ;; TODO: Should we display a warning here, probably?
-    (#f #f)))
+    (#f #f)
+    ;; We shouldn't technically be passing in actors but rather their
+    ;; addresses, but often actors want to message themselves and
+    ;; this makes that slightly easier.
+    ((? (lambda (x) (is-a? x <actor>)) actor)
+     (%<- wants-reply from-actor (actor-id actor) action
+          args message-id in-reply-to))))
 
 (define (<- to action . args)
   (define from-actor (*current-actor*))
            (big-random-number-string))
        #f))
 
-;; TODO: this should abort to the prompt, then check for errors
-;;   when resuming.
-
 (define (<-wait to action . args)
   (define prompt (*actor-prompt*))
   (when (not prompt)
     (error "Tried to <-wait without being in an actor's context..."))
 
-  (let ((reply (abort-to-prompt prompt to action args)))
-    (cond ((eq? action '*error*)
+  (let ((reply (abort-to-prompt prompt '<-wait to action args)))
+    (cond ((eq? (message-action reply) '*error*)
            (throw 'hive-unresumable-coroutine
                   "Won't resume coroutine; got an *error* as a reply"
                   #:message reply))
            #:message message))
   (apply method actor message (message-body message)))
 
+(define-syntax-rule (live-wrap body)
+  "Wrap possibly multi-value function in a procedure, applies all arguments"
+  (lambda args
+    (apply body args)))
+
 (define-syntax-rule (build-actions (symbol method) ...)
   "Construct an alist of (symbol . method), where the method is wrapped
-with wrap-apply to facilitate live hacking and allow the method definition
+with `live-wrap' to facilitate live hacking and allow the method definition
 to come after class definition."
   (build-rmeta-slot
    (list (cons (quote symbol)
-               (wrap-apply method)) ...)))
+               (live-wrap method)) ...)))
 
 (define-class <actor> ()
-  ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?)
-  ;;  - inbox-channel is the receiving channel (as opposed to actor-inbox-deq)
-  ;;  - dead? is a fibers condition variable which is set once this actor
-  ;;    kicks the bucket
+  ;; An <address> object
   (id #:init-keyword #:address
       #:getter actor-id)
-  ;; The connection to the hive we're connected to.
-  (hive-channel #:init-keyword #:hive-channel
-                #:accessor actor-hive-channel)
 
   ;; Our queue to send/receive messages on
   (inbox-deq #:init-thunk make-channel
@@ -273,24 +284,37 @@ to come after class definition."
 (define-method (actor-cleanup! (actor <actor>))
   'no-op)
 
-;;; Addresses are vectors where the first part is the actor-id and
-;;; the second part is the hive-id.  This works well enough... they
-;;; look decent being pretty-printed.
-
-(define (make-address actor-id hive-id channel dead?)
-  (vector actor-id hive-id channel dead?))
-
-(define (address-actor-id address)
-  (vector-ref address 0))
-
-(define (address-hive-id address)
-  (vector-ref address 1))
-
-(define (address-channel address)
-  (vector-ref address 2))
-
-(define (address-dead? address)
-  (vector-ref address 3))
+;;; Every actor has an address, which is how its identified.
+;;; We also pack in some routing information.
+(define-record-type <address>
+  (make-address actor-id hive-id channel dead?)
+  address?
+  ;; Unique-to-this-actor-on-this-hive part
+  (actor-id address-actor-id)
+  ;; Unique identifier for the hive we're connected to.
+  ;; If we don't have a "direct" link to the other actor through
+  ;; a channel, we'll have to look up if our hive has a way to route
+  ;; to the other hive.
+  (hive-id address-hive-id)
+  ;; The receiving channel (as opposed to actor-inbox-deq)
+  (channel address-channel)
+  ;; A fibers condition variable which is set once this actor kicks
+  ;; the bucket
+  (dead? address-dead?))
+
+(set-record-type-printer!
+ <address>
+ (lambda (address port)
+   (format port "<address ~a ~a>"
+           (address->string address)
+           (if (address-channel address)
+               (string-append
+                ":local"
+                (if (atomic-box-ref
+                     ((@@ (fibers conditions) condition-signalled?)
+                      (address-dead? address)))
+                    " :dead" ""))
+               ":remote"))))
 
 (define (address->string address)
   (string-append (address-actor-id address) "@"
@@ -301,14 +325,10 @@ to come after class definition."
 
 This compares the actor-id and hive-id but ignores the channel and
 dead? condition."
-  (match address1
-    (#(actor-id-1 hive-id-1 _ _)
-     (match address2
-       (#(actor-id-2 hive-id-2)
-        (and (equal? actor-id-1 actor-id-2)
-             (and (equal? hive-id-1 hive-id-2))))
-       (_ #f)))
-    (_ #f)))
+  (and (equal? (address-actor-id address1)
+               (address-actor-id address2))
+       (equal? (address-hive-id address1)
+               (address-hive-id address2))))
 
 (define (actor-id-actor actor)
   "Get the actor id component of the actor-id"
@@ -331,6 +351,9 @@ dead? condition."
 (define *actor-prompt*
   (make-parameter #f))
 
+(define *resume-io-channel*
+  (make-parameter #f))
+
 (define (actor-main-loop actor)
   "Main loop of the actor.  Loops around, pulling messages off its queue
 and handling them."
@@ -342,6 +365,9 @@ and handling them."
   (define dead?
     (address-dead? (actor-id actor)))
   (define prompt (make-prompt-tag (actor-id-actor actor)))
+  ;; Not always used, only if with-actor-nonblocking-ports is used
+  (define resume-io-channel
+    (make-channel))
 
   (define (handle-message message)
     (catch #t
@@ -352,10 +378,9 @@ and handling them."
           (lambda vals
             ;; Return reply if necessary
             (when (message-wants-reply message)
-              (when (message-wants-reply message)
-                (%<- #f actor (message-from message) '*reply*
-                     vals ((actor-msg-id-generator actor))
-                     (message-id message)))))))
+              (%<- #f actor (message-from message) '*reply*
+                   vals ((actor-msg-id-generator actor))
+                   (message-id message))))))
       (const #t)
       (let ((err (current-error-port)))
         (lambda (key . args)
@@ -386,6 +411,21 @@ and handling them."
               "Tried to resume nonexistant message: ~a\n"
               (message-id message)))))
 
+  (define (call-with-actor-prompt thunk)
+    (call-with-prompt prompt
+      thunk
+      ;; Here's where we abort to if we're doing <-wait
+      ;; @@: maybe use match-lambda if we're going to end up
+      ;;   handling multiple ~commands
+      (match-lambda*
+        ((kont '<-wait to action message-args)
+         (define message-id
+           ((actor-msg-id-generator actor)))
+         (hash-set! waiting message-id kont)
+         (%<- #t actor to action message-args message-id #f))
+        ((kont 'run-me proc)
+         (proc kont)))))
+
   (define halt-or-handle-message
     ;; It would be nice if we could give priorities to certain operations.
     ;; halt should always win over getting a message...
@@ -394,28 +434,28 @@ and handling them."
                      (const #f))  ; halt and return
      (wrap-operation (get-operation (actor-inbox-deq actor))
                      (lambda (message)
-                       (call-with-prompt prompt
-                         (lambda ()
-                           (if (message-in-reply-to message)
-                               ;; resume a continuation which was waiting on a reply
-                               (resume-handler message)
-                               ;; start handling a new message
-                               (handle-message message)))
-                         ;; Here's where we abort to if we're doing <-wait
-                         ;; @@: maybe use match-lambda if we're going to end up
-                         ;;   handling multiple ~commands
-                         (lambda (kont to action message-args)
-                           (define message-id
-                             ((actor-msg-id-generator actor)))
-                           (hash-set! waiting message-id kont)
-                           (%<- #t actor to action message-args message-id #f)))
-                       #t))))   ; loop again
+                       (call-with-actor-prompt
+                        (lambda ()
+                          (if (message-in-reply-to message)
+                              ;; resume a continuation which was waiting on a reply
+                              (resume-handler message)
+                              ;; start handling a new message
+                              (handle-message message))))
+                       #t))   ; loop again
+     (wrap-operation (get-operation resume-io-channel)
+                     (lambda (thunk)
+                       (call-with-actor-prompt
+                        (lambda ()
+                          (thunk)))
+                       #t))))
 
   ;; Mutate the parameter; this should be fine since each fiber
   ;; runs in its own dynamic state with with-dynamic-state.
   ;; See with-dynamic-state discussion in
   ;;   https://wingolog.org/archives/2017/06/27/growing-fibers
   (*current-actor* actor)
+  (*resume-io-channel* resume-io-channel)
+
   ;; We temporarily set the *actor-prompt* to #f to make sure that
   ;; actor-init! doesn't try to do a <-wait message (and not accidentally use
   ;; the parent fiber's *actor-prompt* either.)
@@ -427,6 +467,59 @@ and handling them."
     (and (perform-operation halt-or-handle-message)
          (loop))))
 
+
+;; @@: So in order for this to work, we're going to have to add
+;; another channel to actors, which is resumable i/o.  We'll have to
+;; spawn a fiber that wakes up a thunk on the actor when its port is
+;; available.  Funky...
+
+(define (%suspend-io-to-actor wait-for-read/write)
+  (lambda (port)
+    (define prompt (*actor-prompt*))
+    (define resume-channel (*resume-io-channel*))
+    (define (run-at-prompt k)
+      (spawn-fiber
+       (lambda ()
+         (wait-for-read/write port)
+         ;; okay, we're awake again, tell the actor to resume this
+         ;; continuation
+         (put-message resume-channel k))
+       #:parallel? #f))
+    (when (not prompt)
+      (error "Attempt to abort to actor prompt outside of actor"))
+    (abort-to-prompt (*actor-prompt*)
+                     'run-me run-at-prompt)))
+
+(define suspend-read-to-actor
+  (%suspend-io-to-actor (@@ (fibers) wait-for-readable)))
+
+(define suspend-write-to-actor
+  (%suspend-io-to-actor (@@ (fibers) wait-for-writable)))
+
+(define (with-actor-nonblocking-ports thunk)
+  "Runs THUNK in dynamic context in which attempting to read/write
+from a port that would otherwise block an actor's correspondence with
+other actors (note that reading from a nonblocking port should never
+block other fibers) will instead permit reading other messages while
+I/O is waiting to complete.
+
+Note that currently "
+  (parameterize ((current-read-waiter suspend-read-to-actor)
+                 (current-write-waiter suspend-write-to-actor))
+    (thunk)))
+
+(define (actor-spawn-fiber thunk . args)
+  "Spawn a fiber from an actor but unset actor-machinery-specific
+dynamic context."
+  (apply spawn-fiber
+         (lambda ()
+           (*current-actor* #f)
+           (*resume-io-channel* #f)
+           (*actor-prompt* #f)
+           (thunk))
+         args))
+
+
 \f
 ;;; Actor utilities
 ;;; ===============
@@ -445,6 +538,7 @@ and handling them."
 ;;;   Every actor has a hive, which keeps track of other actors, manages
 ;;;   cleanup, and performs inter-hive communication.
 
+;; TODO: Make this a srfi-9 record type
 (define-class <hive> ()
   (id #:init-keyword #:id
       #:getter hive-id)
@@ -508,12 +602,17 @@ values, the first value being a symbol"
     (and (perform-operation halt-or-handle)
          (lp))))
 
-(define *current-hive* (make-parameter #f))
+(define *hive-id* (make-parameter #f))
+(define *hive-channel* (make-parameter #f))
 
+;; @@: Should we halt the hive either at the end of spawn-hive or run-hive?
 (define* (spawn-hive proc #:key (hive (make-hive)))
-  "Spawn a hive in a fiber running PROC, passing it the fresh hive"
+  "Spawn a hive and run PROC, passing it the fresh hive and establishing
+a dynamic context surrounding the hive."
   (spawn-fiber (lambda () (hive-main-loop hive)))
-  (proc hive))
+  (parameterize ((*hive-id* (hive-id hive))
+                 (*hive-channel* (hive-channel hive)))
+    (proc hive)))
 
 (define (run-hive proc . args)
   "Spawn a hive and run it in run-fibers.  Takes a PROC as would be passed
@@ -523,15 +622,15 @@ to spawn-hive... all remaining arguments passed to run-fibers."
            (spawn-hive proc))
          args))
 
-(define (%create-actor hive-channel hive-id
-                       actor-class init-args id-cookie send-init?)
-  (let* ((actor-id (gen-actor-id id-cookie))
+(define (%create-actor actor-class init-args id-cookie send-init?)
+  (let* ((hive-channel (*hive-channel*))
+         (hive-id (*hive-id*))
+         (actor-id (gen-actor-id id-cookie))
          (dead? (make-condition))
          (inbox-enq (make-channel))
          (address (make-address actor-id hive-id
                                 inbox-enq dead?))
          (actor (apply make actor-class
-                       #:hive-channel hive-channel
                        #:address address
                        init-args))
          (should-init (actor-should-init actor)))
@@ -555,56 +654,37 @@ to spawn-hive... all remaining arguments passed to run-fibers."
     ;; return the address
     address))
 
-(define* (bootstrap-actor hive actor-class #:rest init-args)
-  "Create an actor on HIVE using ACTOR-CLASS passing in INIT-ARGS args"
-  (%create-actor (hive-channel hive) (hive-id hive) actor-class
-                 init-args (symbol->string (class-name actor-class))
-                 #f))
-
-(define* (bootstrap-actor* hive actor-class id-cookie #:rest init-args)
-  "Create an actor, but also allow customizing a 'cookie' added to the id
-for debugging"
-  (%create-actor (hive-channel hive) (hive-id hive) actor-class
-                 init-args id-cookie
-                 #f))
+;;; Whether or not to attach the class' name as a cookie by default in
+;;; create-actor
+(define *debug-actor-ids*
+  (make-parameter #t))
 
-(define* (create-actor from-actor actor-class #:rest init-args)
+(define* (create-actor actor-class #:rest init-args)
   "Create an instance of actor-class.  Return the new actor's id.
 
 This is the method actors should call directly (unless they want
 to supply an id-cookie, in which case they should use
 create-actor*)."
-  (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor)
-                 actor-class init-args #f #t))
+  (%create-actor actor-class init-args
+                 (if (*debug-actor-ids*)
+                     (symbol->string (class-name actor-class))
+                     #f)
+                 #t))
 
-
-(define* (create-actor* from-actor actor-class id-cookie #:rest init-args)
+(define* (create-actor* actor-class id-cookie #:rest init-args)
   "Create an instance of actor-class.  Return the new actor's id.
 
 Like create-actor, but permits supplying an id-cookie."
-  (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor)
-                 actor-class init-args id-cookie #t))
+  (%create-actor actor-class init-args id-cookie #t))
 
-(define* (self-destruct actor #:key (cleanup #t))
+(define (self-destruct actor)
   "Remove an actor from the hive.
 
-Unless #:cleanup is set to #f, this will first have the actor handle
-its '*cleanup* action handler."
+The actor will also call its `actor-cleanup!' method."
   (signal-condition! (address-dead? (actor-id actor)))
-  (put-message (actor-hive-channel actor) (list 'remove-actor (actor-id-actor actor)))
+  (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor)))
   ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending
   ;; a message with <-wait
   (*actor-prompt* #f)
   (actor-cleanup! actor))
 
-;; From a patch I sent to Fibers...
-(define (condition-signalled? cvar)
-  "Return @code{#t} if @var{cvar} has already been signalled.
-
-In general you will want to use @code{wait} or @code{wait-operation} to
-wait on a condition.  However, sometimes it is useful to see whether or
-not a condition has already been signalled without blocking."
-  (atomic-box-ref ((@@ (fibers conditions) condition-signalled?) cvar)))
-
-(define (actor-alive? actor)
-  (condition-signalled? (address-dead? (actor-id actor))))