Big refactor for 8sync on fibers in progress.
authorChristopher Allan Webber <cwebber@dustycloud.org>
Fri, 14 Jul 2017 19:36:16 +0000 (14:36 -0500)
committerChristopher Allan Webber <cwebber@dustycloud.org>
Thu, 3 Aug 2017 20:42:43 +0000 (15:42 -0500)
* 8sync/actors.scm (%random-state, *random-state*, big-random-number):
Removed %random-state in favor of mutating *random-state* variable.
(<message>, make-message): Remove replied slot from <message>.
(message-auto-reply?, message-needs-reply?): Removed.
(%<-, <-, <-wait): Update to make use of new inbox channels.  These
are now the only two message-sending operators.
(send-message, <-*, <-wait*, <-reply, <-auto-reply, <-reply-wait)
(wait-maybe-handle-errors): Removed.  The <-reply mechanisms in
particular are no longer necessary since the returned values of the
called procedure are automatically the return value's reply.
(<actor>): Actors no longer have access to the hive itself, and
instead have access to the hive's channel through the new
`hive-channel' slot.
Actors also generate their own message ids through their
`msg-id-generator' slot, since the hive is no longer responsible
for generating these messages.
Likewise, actors no longer communicate to other actors through the
hive (though relays/ambassadors will be coordinated through there).
Instead, addresses now contain the actor's channel and stop-condition,
unless the actor is remote, in which case a relay will be looked up
via the hive (well, eventually).
New slot named `inbox-deq' which is what the actor's main loop
actually reads from to get messages from the delivery-agent.
The `*init*' and `*cleanup*' actions have been removed.
(actor-init!, actor-cleanup!): New methods; these are how actors may
initialize and clean themselves up, taking the place of the previous
special-case `*init*' and `*cleanup*' actions.
(make-address): Update to support `channel' and `dead?' slots.
(address-channel, address-dead?): New getters.
(address-equal?): New procedure for comparing two addresses.
(actor-id-actor, actor-id-hive, actor-id-string): Move from being
methods to normal procedures.
(*current-actor*): Renamed from %current-actor.
(actor-inbox-enq, *actor-prompt*, actor-main-loop): New variables.
(hive-handle-failed-forward): Removed.
(<hive>): No longer a subtype of <actor> since this runs in its
own loop.  Removed several slots and added the `id', `channel', and
`halt?' slots.
(make-hive): Updated to reflect new design.
(hive-handle-init-all, hive-handle-failed-forward, hive-handle-cleanup-all)
(hive-id, hive-gen-actor-id, hive-gen-message-id, hive-resolve-local-actor)
(hive-resolve-ambassador, make-forward-request, hive-reply-with-error)
(<waiting-on-reply>, hive-process-message, hive-actor-local?)
(hive-register-actor!, %hive-create-actor): Removed.
(gen-actor-id, hive-main-loop, *current-hive*, spawn-hive, run-hive)
(%create-actor, condition-signalled?, actor-alive?): New variables.
(bootstrap-actor, bootstrap-actor*, create-actor, create-actor*):
Updated to use %create-actor.
(call-with-message, mbody-receive, mbody-val): Removed.
(self-destruct): Updated behavior to signal `address-dead?' condition
and message the hive via the hive-channel, along with running
`actor-cleanup!'.
(run-hive, run-hive-cleanup, bootstrap-message, serialize-message)
(write-message, serialize-message-pretty, pprint-message, read-message):
(read-message-from-string): Removed.

* 8sync/inbox.scm: New file.  Provides the delivery-agent procedure
which is run in its own fiber as a buffered queue.

* 8sync/systems/irc.scm (<irc-bot>): Remove `*init*' and `*cleanup*'
action handlers.
(actor-init!, actor-cleanup!): Add method handlers for `<irc-bot>'
based off of former `irc-bot-init' and `irc-bot-cleanup' respectively.

* demos/actors/botherbotherbother.scm:
* demos/actors/robotscanner.scm:
* demos/actors/simplest-possible.scm: Update to use new semantics.

* Makefile.am (SOURCES): Comment out a bunch of modules which aren't
ready to be built with WIP 8sync-fibers branch.  Also add inbox.scm.

* guix.scm: Add `guile-fibers' to `propagated-inputs' and switch
guile reference from `guile-next' to `guile-2.2' package variable.

8sync/actors.scm
8sync/inbox.scm [new file with mode: 0644]
8sync/systems/irc.scm
Makefile.am
demos/actors/botherbotherbother.scm
demos/actors/robotscanner.scm
demos/actors/simplest-possible.scm
guix.scm

index ceb2980f533e93eab250c85e908dd05e6f885edf..9e249cdd987db42fe3c721d9b886b516ab08c33f 100644 (file)
 (define-module (8sync actors)
   #:use-module (oop goops)
   #:use-module (srfi srfi-9)
-  #:use-module (srfi srfi-9 gnu)
   #:use-module (ice-9 control)
   #:use-module (ice-9 format)
   #:use-module (ice-9 match)
+  #:use-module (ice-9 atomic)
   #:use-module (ice-9 pretty-print)
+  #:use-module (ice-9 receive)
+  #:use-module (fibers)
+  #:use-module (fibers channels)
+  #:use-module (fibers conditions)
+  #:use-module (fibers operations)
   #:use-module (8sync agenda)
+  #:use-module (8sync inbox)
   #:use-module (8sync rmeta-slot)
   #:export (;; utilities... ought to go in their own module
             big-random-number
             big-random-number-string
-            simple-message-id-generator
 
             <actor>
             actor-id
             actor-message-handler
 
-            %current-actor
-
             ;;; Commenting out the <address> type for now;
             ;;; it may be back when we have better serializers
             ;; <address>
             make-address address?
             address-actor-id address-hive-id
 
+            actor-init! actor-cleanup!
+
             address->string
             actor-id-actor
             actor-id-hive
 
             define-actor
 
-            <hive>
-            make-hive
-            ;; There are more methods for the hive, but there's
-            ;; no reason for the outside world to look at them maybe?
-            hive-id
+            ;; <hive>
+            ;; make-hive
+            ;; ;; There are more methods for the hive, but there's
+            ;; ;; no reason for the outside world to look at them maybe?
+            ;; hive-id
             bootstrap-actor bootstrap-actor*
 
             create-actor create-actor*
 
             message-auto-reply?
 
-            <- <-* <-wait <-wait* <-reply <-reply* <-reply-wait <-reply-wait*
-
-            call-with-message mbody-receive mbody-val
+            <- <-wait
 
-            run-hive
-            bootstrap-message
-
-            serialize-message write-message
-            serialize-message-pretty pprint-message
-            read-message read-message-from-string))
+            spawn-hive run-hive))
 
 ;; For ids
-(define %random-state
-  (make-parameter (random-state-from-platform)))
+(set! *random-state* (random-state-from-platform))
 
 ;; Same size as a uuid4 I think...
 (define random-number-size (expt 2 128))
 
 (define (big-random-number)
-  (random random-number-size (%random-state)))
+  (random random-number-size))
 
 ;; Would be great to get this base64 encoded instead.
 (define (big-random-number-string)
   ;; @@: This is slow.  Using format here is wasteful.
   (format #f "~x" (big-random-number)))
 
-;; @@: This is slow.  A mere ~275k / second on my (old) machine.
+;; @@: This is slow-ish.  A mere ~275k / second on my (old) machine.
 ;;   The main cost seems to be in number->string.
 (define (simple-message-id-generator)
   ;; Prepending this cookie makes message ids unique per hive
 ;;; Messages
 ;;; ========
 
-
-;; @@: We may want to add a deferred-reply to the below, similar to
-;;   what we had in XUDD, for actors which do their own response
-;;   queueing.... ie, that might receive messages but need to shelve
-;;   them to be acted upon after something else is taken care of.
-
 (define-record-type <message>
   (make-message-intern id to from action
-                       body in-reply-to wants-reply
-                       replied)
+                       body in-reply-to wants-reply)
   message?
+  ;; @@: message-ids are removed.  They could be re-enabled
+  ;;   if we had thread-safe promises...
   (id message-id)                    ; id of this message
   (to message-to)                    ; actor id this is going to
   (from message-from)                ; actor id of sender
   (action message-action)            ; action (a symbol) to be handled
   (body message-body)                ; argument list "body" of message
   (in-reply-to message-in-reply-to)  ; message id this is in reply to, if any
-  (wants-reply message-wants-reply)  ; whether caller is waiting for reply
-  (replied message-replied           ; was this message replied to?
-           set-message-replied!))
+  (wants-reply message-wants-reply)) ; whether caller is waiting for reply
 
 
 (define* (make-message id to from action body
-                       #:key in-reply-to wants-reply
-                       replied)
+                       #:key in-reply-to wants-reply)
   (make-message-intern id to from action body
-                       in-reply-to wants-reply replied))
-
-(define (message-auto-reply? message)
-  (eq? (message-action message) '*auto-reply*))
-
-(define (message-needs-reply? message)
-  "See if this message needs a reply still"
-  (and (message-wants-reply message)
-       (not (message-replied message))))
-
+                       in-reply-to wants-reply))
 
 (define (kwarg-list-to-alist args)
   (let loop ((remaining args)
 ;; This is the internal, generalized message sending method.
 ;; Users shouldn't use it!  Use the <-foo forms instead.
 
-;; @@: Could we get rid of some of the conditional checks through
-;;   some macro-foo?
-
-(define-inlinable (send-message send-options from-actor to-id action
-                                replying-to-message wants-reply?
-                                message-body-args)
-  (if replying-to-message
-      (set-message-replied! replying-to-message #t))
-  (let* ((hive (actor-hive from-actor))
-         (new-message
-          (make-message (hive-gen-message-id hive) to-id
-                        (actor-id from-actor) action
-                        message-body-args
-                        #:wants-reply wants-reply?
-                        #:in-reply-to
-                        (if replying-to-message
-                            (message-id replying-to-message)
-                            #f))))
-    (if wants-reply?
-        (abort-to-prompt (hive-prompt (actor-hive from-actor))
-                         from-actor new-message send-options)
-        ;; @@: It might be that eventually we pass in send-options
-        ;;   here too.  Since <-wait and <-reply-wait are the only ones
-        ;;   that use it yet, for now it kind of just makes things
-        ;;   confusing.
-        (8sync (hive-process-message hive new-message)))))
-
-(define (<- to-id action . message-body-args)
-  "Send a message from an actor to another actor"
-  (send-message '() (%current-actor) to-id action
-                #f #f message-body-args))
-
-(define (<-* send-options to-id action . message-body-args)
-  "Like <-*, but allows extra parameters via send-options"
-  (define* (really-send #:key (actor (%current-actor))
-                        #:allow-other-keys)
-    (send-message send-options actor to-id action
-                  #f #f message-body-args))
-  (apply really-send send-options))
-
-(define (<-wait to-id action . message-body-args)
-  "Send a message from an actor to another, but wait until we get a response"
-  (wait-maybe-handle-errors
-   (send-message '() (%current-actor) to-id action
-                 #f #t message-body-args)))
-
-(define (<-wait* send-options to-id action . message-body-args)
-  "Like <-wait, but allows extra parameters, for example whether to
-#:accept-errors"
-  (define* (really-send #:key (actor (%current-actor))
-                        #:allow-other-keys)
-    (apply wait-maybe-handle-errors
-           (send-message send-options actor to-id action
-                         #f #t message-body-args)
-           send-options))
-  (apply really-send send-options))
-
-;; TODO: Intelligently ~propagate(ish) errors on -wait functions.
-;;   We might have `send-message-wait-brazen' to allow callers to
-;;   not have an exception thrown and instead just have a message with
-;;   the appropriate '*error* message returned.
-
-(define (<-reply original-message . message-body-args)
-  "Reply to a message"
-  (when (message-needs-reply? original-message)
-    (send-message '() (%current-actor) (message-from original-message) '*reply*
-                  original-message #f message-body-args)))
-
-(define (<-reply* send-options original-message . message-body-args)
-  "Like <-reply, but allows extra parameters via send-options"
-  (define* (really-send #:key (actor (%current-actor))
-                        #:allow-other-keys)
-    (send-message send-options actor
-                  (message-from original-message) '*reply*
-                  original-message #f message-body-args))
-  (when (message-needs-reply? original-message)
-    (apply really-send send-options)))
-
-(define (<-auto-reply actor original-message)
-  "Auto-reply to a message.  Internal use only!"
-  (send-message '() actor (message-from original-message) '*auto-reply*
-                original-message #f '()))
-
-(define (<-reply-wait original-message . message-body-args)
-  "Reply to a messsage, but wait until we get a response"
-  (if (message-needs-reply? original-message)
-      (wait-maybe-handle-errors
-       (send-message '() (%current-actor)
-                     (message-from original-message) '*reply*
-                     original-message #t message-body-args))
-      #f))
-
-(define (<-reply-wait* send-options original-message
-                       . message-body-args)
-  "Like <-reply-wait, but allows extra parameters via send-options"
-  (define* (really-send #:key (actor (%current-actor))
-                        #:allow-other-keys)
-    (apply wait-maybe-handle-errors
-           (send-message send-options actor
-                         (message-from original-message) '*reply*
-                         original-message #t message-body-args)
-           send-options))
-  (when (message-needs-reply? original-message)
-    (apply really-send send-options)))
-
-(define* (wait-maybe-handle-errors message
-                                   #:key accept-errors
-                                   #:allow-other-keys)
-  "Before returning a message to a waiting caller, see if we need to
-raise an exception if an error."
-  (define action (message-action message))
-  (cond ((and (eq? action '*error*)
-              (not accept-errors))
-         (throw 'hive-unresumable-coroutine
-                "Won't resume coroutine; got an *error* as a reply"
-                #:message message))
-        (else message)))
-
+(define-inlinable (%<- wants-reply from-actor to action args message-id in-reply-to)
+  ;; Okay, we need to deal with message ids.
+  ;; Could we get rid of them? :\
+  ;; It seems if we can use eq? and have messages be immutable then
+  ;; it should be possible to identify follow-up replies.
+  ;; If we need to track replies across hive boundaries we could
+  ;; register unique ids across the ambassador barrier.
+  (match to
+    (#(_ _ (? channel? channel) dead?)
+     (let ((message (make-message message-id to
+                                  (and from-actor (actor-id from-actor))
+                                  action args
+                                  #:wants-reply wants-reply
+                                  #:in-reply-to in-reply-to)))
+       (perform-operation
+        (choice-operation
+         (put-operation channel message)
+         (wait-operation dead?)))))
+    ;; TODO: put remote addresses here.
+    (#(actor-id hive-id #f #f)
+     ;; Here we'd make a call to our hive...
+     'TODO)
+    ;; A message sent to nobody goes nowhere.
+    ;; TODO: Should we display a warning here, probably?
+    (#f #f)))
+
+(define (<- to action . args)
+  (define from-actor (*current-actor*))
+  (%<- #f from-actor to action args
+       (or (and from-actor
+                ((actor-msg-id-generator from-actor)))
+           (big-random-number-string))
+       #f))
+
+;; TODO: this should abort to the prompt, then check for errors
+;;   when resuming.
+
+(define (<-wait to action . args)
+  (define prompt (*actor-prompt*))
+  (when (not prompt)
+    (error "Tried to <-wait without being in an actor's context..."))
+
+  (let ((reply (abort-to-prompt prompt to action args)))
+    (cond ((eq? action '*error*)
+           (throw 'hive-unresumable-coroutine
+                  "Won't resume coroutine; got an *error* as a reply"
+                  #:message reply))
+          (else (apply values (message-body reply))))))
 
 \f
 ;;; Main actor implementation
@@ -316,13 +226,23 @@ to come after class definition."
                (wrap-apply method)) ...)))
 
 (define-class <actor> ()
-  ;; An address object
-  (id #:init-keyword #:id
+  ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?)
+  ;;  - inbox-channel is the receiving channel (as opposed to actor-inbox-deq)
+  ;;  - dead? is a fibers condition variable which is set once this actor
+  ;;    kicks the bucket
+  (id #:init-keyword #:address
       #:getter actor-id)
-  ;; The hive we're connected to.
-  ;; We need this to be able to send messages.
-  (hive #:init-keyword #:hive
-        #:accessor actor-hive)
+  ;; The connection to the hive we're connected to.
+  (hive-channel #:init-keyword #:hive-channel
+                #:accessor actor-hive-channel)
+
+  ;; Our queue to send/receive messages on
+  (inbox-deq #:init-thunk make-channel
+             #:accessor actor-inbox-deq)
+
+  (msg-id-generator #:init-thunk simple-message-id-generator
+                    #:getter actor-msg-id-generator)
+
   ;; How we receive and process new messages
   (message-handler #:init-value actor-inheritable-message-handler
                    ;; @@: There's no reason not to use #:class instead of
@@ -336,22 +256,29 @@ to come after class definition."
   ;;  - 'wait, as in wait on the init message
   ;;  - #f as in don't bother to init
   (should-init #:init-value #t
+               #:getter actor-should-init
                #:allocation #:each-subclass)
 
   ;; This is the default, "simple" way to inherit and process messages.
-  (actions #:init-thunk (build-actions
-                         ;; Default init method is to do nothing.
-                         (*init* (const #f))
-                         ;; Default cleanup method is to do nothing.
-                         (*cleanup* (const #f)))
+  (actions #:init-thunk (build-actions)
            #:allocation #:each-subclass))
 
+;;; Actors may specify an "init" action that occurs before the actor
+;;; actually begins to run.
+;;; During actor-init!, an actor may send a message to itself or others
+;;; via <- but *may not* use <-wait.
+(define-method (actor-init! (actor <actor>))
+  'no-op)
+
+(define-method (actor-cleanup! (actor <actor>))
+  'no-op)
+
 ;;; Addresses are vectors where the first part is the actor-id and
 ;;; the second part is the hive-id.  This works well enough... they
 ;;; look decent being pretty-printed.
 
-(define (make-address actor-id hive-id)
-  (vector actor-id hive-id))
+(define (make-address actor-id hive-id channel dead?)
+  (vector actor-id hive-id channel dead?))
 
 (define (address-actor-id address)
   (vector-ref address 0))
@@ -359,28 +286,146 @@ to come after class definition."
 (define (address-hive-id address)
   (vector-ref address 1))
 
+(define (address-channel address)
+  (vector-ref address 2))
+
+(define (address-dead? address)
+  (vector-ref address 3))
+
 (define (address->string address)
   (string-append (address-actor-id address) "@"
                  (address-hive-id address)))
 
-(define-method (actor-id-actor (actor <actor>))
+(define (address-equal? address1 address2)
+  "Check whether or not the two addresses are equal.
+
+This compares the actor-id and hive-id but ignores the channel and
+dead? condition."
+  (match address1
+    (#(actor-id-1 hive-id-1 _ _)
+     (match address2
+       (#(actor-id-2 hive-id-2)
+        (and (equal? actor-id-1 actor-id-2)
+             (and (equal? hive-id-1 hive-id-2))))
+       (_ #f)))
+    (_ #f)))
+
+(define (actor-id-actor actor)
   "Get the actor id component of the actor-id"
   (address-actor-id (actor-id actor)))
 
-(define-method (actor-id-hive (actor <actor>))
+(define (actor-id-hive actor)
   "Get the hive id component of the actor-id"
   (address-hive-id (actor-id actor)))
 
-(define-method (actor-id-string (actor <actor>))
+(define (actor-id-string actor)
   "Render the full actor id as a human-readable string"
   (address->string (actor-id actor)))
 
-(define %current-actor
+(define (actor-inbox-enq actor)
+  (address-channel (actor-id actor)))
+
+(define *current-actor*
   (make-parameter #f))
 
-(define (actor-alive? actor)
-  (hive-resolve-local-actor (actor-hive actor) (actor-id actor)))
+(define *actor-prompt*
+  (make-parameter #f))
 
+(define (actor-main-loop actor)
+  "Main loop of the actor.  Loops around, pulling messages off its queue
+and handling them."
+  ;; @@: Maybe establish some sort of garbage collection routine for these...
+  (define waiting
+    (make-hash-table))
+  (define message-handler
+    (actor-message-handler actor))
+  (define dead?
+    (address-dead? (actor-id actor)))
+  (define prompt (make-prompt-tag (actor-id-actor actor)))
+
+  (define (handle-message message)
+    (catch #t
+      (lambda ()
+        (call-with-values
+            (lambda ()
+              (message-handler actor message))
+          (lambda vals
+            ;; Return reply if necessary
+            (when (message-wants-reply message)
+              (when (message-wants-reply message)
+                (%<- #f actor (message-from message) '*reply*
+                     vals ((actor-msg-id-generator actor))
+                     (message-id message)))))))
+      (const #t)
+      (let ((err (current-error-port)))
+        (lambda (key . args)
+          (false-if-exception
+           (let ((stack (make-stack #t 4)))
+             (format err "Uncaught exception when handling message ~a:\n"
+                     message)
+             (display-backtrace stack err)
+             (print-exception err (stack-ref stack 0)
+                              key args)
+             (newline err)
+             ;; If the other actor is waiting on a reply, let's let them
+             ;; know there was an error...
+             (when (message-wants-reply message)
+               (%<- #f actor (message-from message) '*error*
+                    (list key) ((actor-msg-id-generator actor))
+                    (message-id message)))))))))
+  
+  (define (resume-handler message)
+    (define in-reply-to (message-in-reply-to message))
+    (cond
+     ((hash-ref waiting in-reply-to) =>
+      (lambda (kont)
+        (hash-remove! waiting in-reply-to)
+        (kont message)))
+     (else
+      (format (current-error-port)
+              "Tried to resume nonexistant message: ~a\n"
+              (message-id message)))))
+
+  (define halt-or-handle-message
+    ;; It would be nice if we could give priorities to certain operations.
+    ;; halt should always win over getting a message...
+    (choice-operation
+     (wrap-operation (wait-operation dead?)
+                     (const #f))  ; halt and return
+     (wrap-operation (get-operation (actor-inbox-deq actor))
+                     (lambda (message)
+                       (call-with-prompt prompt
+                         (lambda ()
+                           (if (message-in-reply-to message)
+                               ;; resume a continuation which was waiting on a reply
+                               (resume-handler message)
+                               ;; start handling a new message
+                               (handle-message message)))
+                         ;; Here's where we abort to if we're doing <-wait
+                         ;; @@: maybe use match-lambda if we're going to end up
+                         ;;   handling multiple ~commands
+                         (lambda (kont to action message-args)
+                           (define message-id
+                             ((actor-msg-id-generator actor)))
+                           (hash-set! waiting message-id kont)
+                           (%<- #t actor to action message-args message-id #f)))
+                       #t))))   ; loop again
+
+  ;; Mutate the parameter; this should be fine since each fiber
+  ;; runs in its own dynamic state with with-dynamic-state.
+  ;; See with-dynamic-state discussion in
+  ;;   https://wingolog.org/archives/2017/06/27/growing-fibers
+  (*current-actor* actor)
+  ;; We temporarily set the *actor-prompt* to #f to make sure that
+  ;; actor-init! doesn't try to do a <-wait message (and not accidentally use
+  ;; the parent fiber's *actor-prompt* either.)
+  (*actor-prompt* #f)
+  (actor-init! actor)
+  (*actor-prompt* prompt)
+
+  (let loop ()
+    (and (perform-operation halt-or-handle-message)
+         (loop))))
 
 \f
 ;;; Actor utilities
@@ -397,358 +442,131 @@ to come after class definition."
 \f
 ;;; The Hive
 ;;; ========
-;;;   Every actor has a hive.  The hive is a kind of "meta-actor"
-;;;   which routes all the rest of the actors in a system.
-
-(define-generic hive-handle-failed-forward)
+;;;   Every actor has a hive, which keeps track of other actors, manages
+;;;   cleanup, and performs inter-hive communication.
 
-(define-class <hive> (<actor>)
+(define-class <hive> ()
+  (id #:init-keyword #:id
+      #:getter hive-id)
   (actor-registry #:init-thunk make-hash-table
                   #:getter hive-actor-registry)
-  (msg-id-generator #:init-thunk simple-message-id-generator
-                    #:getter hive-msg-id-generator)
+  ;; TODO: Rename "ambassadors" to "relays"
   ;; Ambassadors are used (or will be) for inter-hive communication.
-  ;; These are special actors that know how to route messages to other hives.
+  ;; These are special actors that know how to route messages to other
+  ;; hives.
   (ambassadors #:init-thunk make-weak-key-hash-table
                #:getter hive-ambassadors)
-  ;; Waiting coroutines
-  ;; This is a map from cons cell of message-id
-  ;;   to a cons cell of (actor-id . coroutine)
-  ;; @@: Should we have a <waiting-coroutine> record type?
-  ;; @@: Should there be any way to clear out "old" coroutines?
-  (waiting-coroutines #:init-thunk make-hash-table
-                      #:getter hive-waiting-coroutines)
-  ;; Message prompt
-  ;; When actors send messages to each other they abort to this prompt
-  ;; to send the message, then carry on their way
-  (prompt #:init-thunk make-prompt-tag
-          #:getter hive-prompt)
-  (actions #:allocation #:each-subclass
-           #:init-thunk
-           (build-actions
-            ;; This is in the case of an ambassador failing to forward a
-            ;; message... it reports it back to the hive
-            (*failed-forward* hive-handle-failed-forward)
-            ;; These are called at start and end of run-hive
-            (*init-all* hive-handle-init-all)
-            (*cleanup-all* hive-handle-cleanup-all))))
-
-(define-method (hive-handle-init-all (hive <hive>) message)
-  "Run *init* method on all actors in registry"
-  ;; We have to do this hack and run over the list
-  ;; twice, because hash-for-each would result in an unrewindable
-  ;; continuation, and to avoid the hash-map changing during the
-  ;; middle of this.
-  (define actor-ids
-    (hash-map->list (lambda (actor-id actor) actor-id)
-                    (hive-actor-registry hive)))
-  (for-each (lambda (actor-id)
-              (let* ((actor (hash-ref (hive-actor-registry hive)
-                                      actor-id)))
-                (match (slot-ref actor 'should-init)
-                  (#f #f)
-                  ('wait
-                   (<-wait actor-id '*init*))
-                  (_
-                   (<- actor-id '*init*)))))
-            actor-ids))
-
-(define-method (hive-handle-failed-forward (hive <hive>) message)
-  "Handle an ambassador failing to forward a message"
-  'TODO)
-
-(define-method (hive-handle-cleanup-all (hive <hive>) message)
-  "Send a message to all actors in our registry to clean themselves up."
-  ;; We have to do this hack and run over the list
-  ;; twice, because hash-for-each would result in an unrewindable
-  ;; continuation, and to avoid the hash-map changing during the
-  ;; middle of this.
-  (define actor-ids
-    (hash-map->list (lambda (actor-id actor) actor-id)
-                    (hive-actor-registry hive)))
-  (for-each (lambda (actor-id)
-              (<- actor-id '*cleanup*))
-            actor-ids))
+  (channel #:init-thunk make-channel
+           #:getter hive-channel)
+  (halt? #:init-thunk make-condition
+         #:getter hive-halt?))
 
 (define* (make-hive #:key hive-id)
-  (let ((hive (make <hive>
-                #:id (make-address
-                      "hive" (or hive-id
-                                 (big-random-number-string))))))
-    ;; Set the hive's actor reference to itself
-    (set! (actor-hive hive) hive)
-    ;; Register the actor with itself
-    (hive-register-actor! hive hive)
-    hive))
-
-(define-method (hive-id (hive <hive>))
-  (actor-id-hive hive))
-
-(define-method (hive-gen-actor-id (hive <hive>) cookie)
-  (make-address (if cookie
-                    (string-append cookie ":" (big-random-number-string))
-                    (big-random-number-string))
-                (hive-id hive)))
-
-(define-method (hive-gen-message-id (hive <hive>))
-  "Generate a message id using HIVE's message id generator"
-  ((hive-msg-id-generator hive)))
-
-(define-method (hive-resolve-local-actor (hive <hive>) actor-address)
-  (hash-ref (hive-actor-registry hive) actor-address))
-
-(define-method (hive-resolve-ambassador (hive <hive>) ambassador-address)
-  (hash-ref (hive-ambassadors hive) ambassador-address))
-
-(define-method (make-forward-request (hive <hive>) (ambassador <actor>) message)
-  (make-message (hive-gen-message-id hive) (actor-id ambassador)
-                ;; If we make the hive not an actor, we could either switch this
-                ;; to #f or to the original actor...?
-                ;; Maybe some more thinking should be done on what should
-                ;; happen in case of failure to forward?  Handling ambassador failures
-                ;; seems like the primary motivation for the hive remaining an actor.
-                (actor-id hive)
-                '*forward*
-                `((original . ,message))))
-
-(define-method (hive-reply-with-error (hive <hive>) original-message
-                                      error-key error-args)
-  ;; We only supply the error-args if the original sender is on the same hive
-  (define (orig-actor-on-same-hive?)
-    (equal? (hive-id hive)
-            (address-hive-id (message-from original-message))))
-  (set-message-replied! original-message #t)
-  (let* ((new-message-body
-          (if (orig-actor-on-same-hive?)
-              `(#:original-message ,original-message
-                #:error-key ,error-key
-                #:error-args ,error-args)
-              `(#:original-message ,original-message
-                #:error-key ,error-key)))
-         (new-message (make-message (hive-gen-message-id hive)
-                                    (message-from original-message)
-                                    (actor-id hive) '*error*
-                                    new-message-body
-                                    #:in-reply-to (message-id original-message))))
-    ;; We only return a thunk, rather than run 8sync here, because if
-    ;; we ran 8sync in the middle of a catch we'd end up with an
-    ;; unresumable continuation.
-    (lambda () (hive-process-message hive new-message))))
-
-(define-record-type <waiting-on-reply>
-  (make-waiting-on-reply actor-id kont send-options)
-  waiting-on-reply?
-  (actor-id waiting-on-reply-actor-id)
-  (kont waiting-on-reply-kont)
-  (send-options waiting-on-reply-send-options))
-
-
-(define-method (hive-process-message (hive <hive>) message)
-  "Handle one message, or forward it via an ambassador"
-  (define (maybe-autoreply actor)
-    ;; Possibly autoreply
-    (if (message-needs-reply? message)
-        (<-auto-reply actor message)))
-
-  (define (resolve-actor-to)
-    "Get the actor the message was aimed at"
-    (let ((actor (hive-resolve-local-actor hive (message-to message))))
-      (if (not actor)
-          (throw 'actor-not-found
-                 (format #f "Message ~a from ~a directed to nonexistant actor ~a"
-                         (message-id message)
-                         (address->string (message-from message))
-                         (address->string (message-to message)))
-                 message))
-      actor))
-
-  ;; TODO: I'm pretty sure we're building up another stack of prompts here
-  ;;   with the way we're doing this.  That's a real problem.
-  (define* (call-catching-coroutine thunk #:optional (catch-errors #t))
-    (define queued-error-handling-thunk #f)
-    (define (call-catching-errors)
-      ;; TODO: maybe parameterize (or attach to hive) and use
-      ;;   maybe-catch-all from agenda.scm
-      ;; @@: Why not just use with-throw-handler and let the catch
-      ;;   happen at the agenda?  That's what we used to do, but
-      ;;   it ended up with a SIGABRT.  See:
-      ;;     http://lists.gnu.org/archive/html/bug-guile/2016-05/msg00003.html
-      (catch #t
-        thunk
-        ;; In the actor model, we don't totally crash on errors.
-        (lambda _ #f)
-        ;; If an error happens, we raise it
-        (lambda (key . args)
-          (if (message-needs-reply? message)
-              ;; If the message is waiting on a reply, let them know
-              ;; something went wrong.
-              ;; However, we have to do it outside of this catch
-              ;; routine, or we'll end up in an unrewindable continuation
-              ;; situation.
-              (set! queued-error-handling-thunk
-                    (hive-reply-with-error hive message key args)))
-          ;; print error message
-          (apply print-error-and-continue key args)))
-      ;; @@: This is a kludge.  See above for why.
-      (if queued-error-handling-thunk
-          (8sync (queued-error-handling-thunk))))
-    (call-with-prompt (hive-prompt hive)
-      (if catch-errors
-          call-catching-errors
-          thunk)
-      (lambda (kont actor message send-options)
-        ;; Register the coroutine
-        (hash-set! (hive-waiting-coroutines hive)
-                   (message-id message)
-                   (make-waiting-on-reply
-                    (actor-id actor) kont send-options))
-        ;; Send off the message
-        (8sync (hive-process-message hive message)))))
-
-  (define (process-local-message)
-    (let ((actor (resolve-actor-to)))
-      (call-catching-coroutine
-       (lambda ()
-         (define message-handler (actor-message-handler actor))
-         ;; @@: Should a more general error handling happen here?
-         (parameterize ((%current-actor actor))
-           (let ((result
-                  (message-handler actor message)))
-             (maybe-autoreply actor)
-             ;; Returning result allows actors to possibly make a run-request
-             ;; at the end of handling a message.
-             ;; ... We do want that, right?
-             result))))))
-
-  (define (resume-waiting-coroutine)
-    (case (message-action message)
-      ;; standard reply / auto-reply
-      ((*reply* *auto-reply* *error*)
-       (call-catching-coroutine
-        (lambda ()
-          (match (hash-remove! (hive-waiting-coroutines hive)
-                               (message-in-reply-to message))
-            ((_ . waiting)
-             (if (not (equal? (message-to message)
-                              (waiting-on-reply-actor-id waiting)))
-                 (throw 'resuming-to-wrong-actor
-                        "Attempted to resume a coroutine to the wrong actor!"
-                        #:expected-actor-id (message-to message)
-                        #:got-actor-id (waiting-on-reply-actor-id waiting)
-                        #:message message))
-             (let* (;; @@: How should we resolve resuming coroutines to actors who are
-                    ;;   now gone?
-                    (actor (resolve-actor-to))
-                    (kont (waiting-on-reply-kont waiting))
-                    (result (kont message)))
-               (maybe-autoreply actor)
-               result))
-            (#f (throw 'no-waiting-coroutine
-                       "message in-reply-to tries to resume nonexistent coroutine"
-                       message))))
-        ;; no need to catch errors here, there's already an error handler
-        #f))
-      ;; Unhandled action for a reply!
-      (else
-       (throw 'hive-unresumable-coroutine
-              "Won't resume coroutine, nonsense action on reply message"
-              #:action (message-action message)
-              #:message message))))
-
-  (define (process-remote-message)
-    ;; Find the ambassador
-    (let* ((remote-hive-id (hive-id (message-to message)))
-           (ambassador (hive-resolve-ambassador remote-hive-id))
-           (message-handler (actor-message-handler ambassador))
-           (forward-request (make-forward-request hive ambassador message)))
-      (message-handler ambassador forward-request)))
-
-  (let ((to (message-to message)))
-    ;; This seems to be an easy mistake to make, so check that addressing
-    ;; is correct here
-    (if (not to)
-        (throw 'missing-addressee
-               "`to' field is missing on message"
-               #:message message))
-    (if (hive-actor-local? hive to)
-        (if (message-in-reply-to message)
-            (resume-waiting-coroutine)
-            (process-local-message))
-        (process-remote-message))))
-
-(define-method (hive-actor-local? (hive <hive>) address)
-  (equal? (hive-id hive) (address-hive-id address)))
-
-(define-method (hive-register-actor! (hive <hive>) (actor <actor>))
-  (hash-set! (hive-actor-registry hive) (actor-id actor) actor))
-
-(define-method (%hive-create-actor (hive <hive>) actor-class
-                                   init-args id-cookie send-init?)
-  "Actual method called by bootstrap-actor / create-actor.
-
-Since this is a define-method it can't accept fancy define* arguments,
-so this gets called from the nicer bootstrap-actor interface.  See
-that method for documentation."
-  (let* ((actor-id (hive-gen-actor-id hive id-cookie))
+  (make <hive> #:id (or hive-id
+                        (big-random-number-string))))
+
+(define (gen-actor-id cookie)
+  (if cookie
+      (string-append cookie ":" (big-random-number-string))
+      (big-random-number-string)))
+
+(define (hive-main-loop hive)
+  "The main loop of the hive.  This listens for messages on the hive-channel
+for certain actions to perform.
+
+`messages' here is not the same as a <message> object; these are a list of
+values, the first value being a symbol"
+  (define channel (hive-channel hive))
+  (define halt? (hive-halt? hive))
+  (define registry (hive-actor-registry hive))
+
+  ;; not the same as a <message> ;P
+  (define handle-message
+    (match-lambda
+      (('register-actor actor-id address actor)
+       (hash-set! registry actor-id (vector address actor)))
+      ;; Remove the actor from hive
+      (('remove-actor actor-id)
+       (hash-remove! (hive-actor-registry hive) actor-id))
+      (('register-ambassador hive-id ambassador-actor-id)
+       'TODO)
+      (('unregister-ambassador hive-id ambassador-actor-id)
+       'TODO)
+      (('forward-message from-actor-id message)
+       'TODO)))
+
+  (define halt-or-handle
+    (choice-operation
+     (wrap-operation (get-operation channel)
+                     (lambda (msg)
+                       (handle-message msg)
+                       #t))
+     (wrap-operation (wait-operation halt?)
+                     (const #f))))
+
+  (let lp ()
+    (and (perform-operation halt-or-handle)
+         (lp))))
+
+(define *current-hive* (make-parameter #f))
+
+(define* (spawn-hive proc #:key (hive (make-hive)))
+  "Spawn a hive in a fiber running PROC, passing it the fresh hive"
+  (spawn-fiber (lambda () (hive-main-loop hive)))
+  (proc hive))
+
+(define (run-hive proc . args)
+  "Spawn a hive and run it in run-fibers.  Takes a PROC as would be passed
+to spawn-hive... all remaining arguments passed to run-fibers."
+  (apply run-fibers
+         (lambda ()
+           (spawn-hive proc))
+         args))
+
+(define (%create-actor hive-channel hive-id
+                       actor-class init-args id-cookie send-init?)
+  (let* ((actor-id (gen-actor-id id-cookie))
+         (dead? (make-condition))
+         (inbox-enq (make-channel))
+         (address (make-address actor-id hive-id
+                                inbox-enq dead?))
          (actor (apply make actor-class
-                       #:hive hive
-                       #:id actor-id
+                       #:hive-channel hive-channel
+                       #:address address
                        init-args))
-         (actor-should-init (slot-ref actor 'should-init)))
-    (hive-register-actor! hive actor)
-    ;; Maybe run actor init method
-    (when (and send-init? actor-should-init)
-      (let ((send-method
-             (if (eq? actor-should-init 'wait)
-                 <-wait <-)))
-        (send-method actor-id '*init*)))
-    ;; return the actor id
-    actor-id))
+         (should-init (actor-should-init actor)))
+
+    ;; start the main loop
+    (spawn-fiber (lambda ()
+                   ;; start the inbox loop
+                   (spawn-fiber
+                    (lambda ()
+                      (delivery-agent inbox-enq (actor-inbox-deq actor)
+                                      dead?))
+                    ;; this one is decidedly non-parallel, because we want
+                    ;; the delivery agent to be in the same thread as its actor
+                    #:parallel? #f)
+
+                   (actor-main-loop actor))
+                 #:parallel? #t)
+
+    (put-message hive-channel (list 'register-actor actor-id address actor))
+    
+    ;; return the address
+    address))
 
 (define* (bootstrap-actor hive actor-class #:rest init-args)
   "Create an actor on HIVE using ACTOR-CLASS passing in INIT-ARGS args"
-  (%hive-create-actor hive actor-class
-                    init-args (symbol->string (class-name actor-class))
-                    #f))
+  (%create-actor (hive-channel hive) (hive-id hive) actor-class
+                 init-args (symbol->string (class-name actor-class))
+                 #f))
 
 (define* (bootstrap-actor* hive actor-class id-cookie #:rest init-args)
   "Create an actor, but also allow customizing a 'cookie' added to the id
 for debugging"
-  (%hive-create-actor hive actor-class
-                    init-args id-cookie
-                    #f))
-
-(define (call-with-message message proc)
-  "Applies message body arguments into procedure, with message as first
-argument.  Similar to call-with-values in concept."
-  (apply proc message (message-body message)))
-
-;; (mbody-receive (<- bar baz)
-;;     (baz)
-;;   basil)
-
-;; Emacs: (put 'mbody-receive 'scheme-indent-function 2)
-
-;; @@: Or receive-msg or receieve-message or??
-(define-syntax-rule (mbody-receive arglist message body ...)
-  "Call body with arglist (which can accept arguments like lambda*)
-applied from the message-body of message."
-  (call-with-message message
-                     (lambda* arglist
-                       body ...)))
-
-(define (mbody-val message)
-  "Retrieve the first value from the message-body of message.
-Like single value return from a procedure call.  Probably the most
-common case when waiting on a reply from some action invocation."
-  (call-with-message message
-                     (lambda (_ val) val)))
-
-\f
-;;; Various API methods for actors to interact with the system
-;;; ==========================================================
-
-;; TODO: move send-message and friends here...?
+  (%create-actor (hive-channel hive) (hive-id hive) actor-class
+                 init-args id-cookie
+                 #f))
 
 (define* (create-actor from-actor actor-class #:rest init-args)
   "Create an instance of actor-class.  Return the new actor's id.
@@ -756,130 +574,37 @@ common case when waiting on a reply from some action invocation."
 This is the method actors should call directly (unless they want
 to supply an id-cookie, in which case they should use
 create-actor*)."
-  (%hive-create-actor (actor-hive from-actor) actor-class
-                      init-args #f #t))
+  (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor)
+                 actor-class init-args #f #t))
 
 
 (define* (create-actor* from-actor actor-class id-cookie #:rest init-args)
   "Create an instance of actor-class.  Return the new actor's id.
 
 Like create-actor, but permits supplying an id-cookie."
-  (%hive-create-actor (actor-hive from-actor) actor-class
-                      init-args id-cookie #t))
-
+  (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor)
+                 actor-class init-args id-cookie #t))
 
 (define* (self-destruct actor #:key (cleanup #t))
   "Remove an actor from the hive.
 
 Unless #:cleanup is set to #f, this will first have the actor handle
 its '*cleanup* action handler."
-  (when cleanup
-    (<-wait (actor-id actor) '*cleanup*))
-  (hash-remove! (hive-actor-registry (actor-hive actor))
-                (actor-id actor)))
-
-
-\f
-;;; 8sync bootstrap utilities
-;;; =========================
-
-(define* (run-hive hive initial-tasks
-                   #:key (cleanup #t)
-                   (handle-signals (list SIGINT SIGTERM)))
-  "Start up an agenda and run HIVE in it with INITIAL-TASKS.
-
-Keyword arguments:
- - #:cleanup: Whether to run *cleanup* on all actors.
- - #:handle-sigactions: a list of signals to set up interrupt
-   handlers for, so cleanup sill still happen as expected.
-   Defaults to a list of SIGINT and SIGTERM."
-  (dynamic-wind
-    (const #f)
-    (lambda ()
-      (define (run-it escape)
-        (define (handle-signal signum)
-          (restore-signals)
-          (escape signum))
-        (for-each (lambda (signum)
-                    (sigaction signum handle-signal))
-                  handle-signals)
-        (let* ((queue (list->q
-                       (cons (bootstrap-message hive (actor-id hive) '*init-all*)
-                             initial-tasks)))
-               (agenda (make-agenda #:pre-unwind-handler print-error-and-continue
-                                    #:queue queue)))
-          (run-agenda agenda)))
-      (call/ec run-it))
-    ;; Run cleanup
-    (lambda ()
-      (when cleanup
-        (run-hive-cleanup hive)))))
-
-(define (run-hive-cleanup hive)
-  (let ((queue (list->q (list (bootstrap-message hive (actor-id hive)
-                                                 '*cleanup-all*)))))
-    (run-agenda
-     (make-agenda #:queue queue))))
+  (signal-condition! (address-dead? (actor-id actor)))
+  (put-message (actor-hive-channel actor) (list 'remove-actor (actor-id-actor actor)))
+  ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending
+  ;; a message with <-wait
+  (*actor-prompt* #f)
+  (actor-cleanup! actor))
+
+;; From a patch I sent to Fibers...
+(define (condition-signalled? cvar)
+  "Return @code{#t} if @var{cvar} has already been signalled.
+
+In general you will want to use @code{wait} or @code{wait-operation} to
+wait on a condition.  However, sometimes it is useful to see whether or
+not a condition has already been signalled without blocking."
+  (atomic-box-ref ((@@ (fibers conditions) condition-signalled?) cvar)))
 
-(define (bootstrap-message hive to-id action . message-body-args)
-  (wrap
-   (apply <-* `(#:actor ,hive) to-id action message-body-args)))
-
-
-\f
-;;; Basic readers / writers
-;;; =======================
-
-(define (serialize-message message)
-  "Serialize a message for read/write"
-  (list
-   (message-id message)
-   (message-to message)
-   (message-from message)
-   (message-action message)
-   (message-body message)
-   (message-in-reply-to message)
-   (message-wants-reply message)
-   (message-replied message)))
-
-(define* (write-message message #:optional (port (current-output-port)))
-  "Write out a message to a port for easy reading later.
-
-Note that if a sub-value can't be easily written to something
-Guile's `read' procedure knows how to read, this doesn't do anything
-to improve that.  You'll need a better serializer for that.."
-  (write (serialize-message message) port))
-
-(define (serialize-message-pretty message)
-  "Serialize a message in a way that's easy for humans to read."
-  `(*message*
-    (id ,(message-id message))
-    (to ,(message-to message))
-    (from ,(message-from message))
-    (action ,(message-action message))
-    (body ,(message-body message))
-    (in-reply-to ,(message-in-reply-to message))
-    (wants-reply ,(message-wants-reply message))
-    (replied ,(message-replied message))))
-
-(define (pprint-message message)
-  "Pretty print a message."
-  (pretty-print (serialize-message-pretty message)))
-
-(define* (read-message #:optional (port (current-input-port)))
-  "Read a message serialized via serialize-message from PORT"
-  (match (read port)
-    ((id to from action body in-reply-to wants-reply replied)
-     (make-message-intern
-      id to from action body
-      in-reply-to wants-reply replied))
-    (anything-else
-     (throw 'message-read-bad-structure
-            "Could not read message from structure"
-            anything-else))))
-
-(define (read-message-from-string message-str)
-  "Read message from MESSAGE-STR"
-  (with-input-from-string message-str
-    (lambda ()
-      (read-message (current-input-port)))))
+(define (actor-alive? actor)
+  (condition-signalled? (address-dead? (actor-id actor))))
diff --git a/8sync/inbox.scm b/8sync/inbox.scm
new file mode 100644 (file)
index 0000000..ad3bdfb
--- /dev/null
@@ -0,0 +1,90 @@
+;;; 8sync --- Asynchronous programming for Guile
+;;; Copyright Â© 2017 Christopher Allan Webber <cwebber@dustycloud.org>
+;;;
+;;; This file is part of 8sync.
+;;;
+;;; 8sync is free software: you can redistribute it and/or modify it
+;;; under the terms of the GNU Lesser General Public License as
+;;; published by the Free Software Foundation, either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; 8sync is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+;;; GNU Lesser General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Lesser General Public
+;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
+
+(define-module (8sync inbox)
+  #:use-module (fibers)
+  #:use-module (fibers channels)
+  #:use-module (fibers conditions)
+  #:use-module (fibers operations)
+  #:use-module (ice-9 match)
+  #:use-module (ice-9 q)
+  #:use-module (srfi srfi-9)
+  #:use-module (ice-9 atomic)
+  #:export (spawn-inbox
+            delivery-agent))
+
+(define* (spawn-inbox)
+  "Spawn an inbox fiber which manages a a buffered queue.
+
+Returns three values to its continuation: a INBOX-ENQ channel to send
+messages to, an INBOX-DEQ channel which is what the actor doing the
+reading should read from, and a STOP? atomic box which can be set to #t
+to stop delivery."
+  (let ((inbox-enq (make-channel))
+        (inbox-deq (make-channel))
+        (stop? (make-atomic-box #f)))
+    (spawn-fiber (lambda ()
+                   ;; From the perspective of the delivery-agent,
+                   ;; deliver-to
+                   (delivery-agent inbox-enq inbox-deq stop?)))
+    (values inbox-enq inbox-deq stop?)))
+
+;; @@: Do we want to add a stop condition?
+(define (delivery-agent inbox-enq inbox-deq stop?)
+  "This starts up a loop doing delivery receiving from INBOX-ENQ and
+delivering to INBOX-DEQ, actually managing an (ice-9 q) object QUEUE.
+Atomic box STOP? can be set to indicate that this "
+  (define queue
+    (make-q))
+  (define get-or-stop
+    (choice-operation
+     (wrap-operation (get-operation inbox-enq)
+                     (lambda (message)
+                       (enq! queue message)
+                       'got-one))
+     (wrap-operation (wait-operation stop?)
+                     (const 'stop))))
+  (let main-lp ()
+    (cond
+     ;; No items to deliver?  We need to get one first...
+     ((q-empty? queue)
+      (match (perform-operation get-or-stop)
+        ;; keep looping
+        ('got-one (main-lp))
+        ;; halt!
+        ('stop 'done)))
+     (else
+      ;; Pull an item off the queue for delivery...
+      (let ((this-one (deq! queue)))
+        ;; But we need to start looping!  
+        (let deliver-this-one ()
+          (match (perform-operation
+                  (choice-operation
+                   ;; get a new message and keep trying to deliver
+                   ;; this one, or stop
+                   get-or-stop
+                   ;; deliver this one and get the next one to deliver
+                   (wrap-operation (put-operation inbox-deq this-one)
+                                   (const 'delivered))))
+            ;; We're dispatching based on which one succeeds.
+            ;; Maybe this isn't necessary, but I'm not convinced
+            ;; that looping within the choice-operation would be
+            ;; properly tail recursive.
+            ('got-one (deliver-this-one))
+            ('delivered (main-lp))
+            ('stop 'done))))))))
index fff346177859e9d0125b53d4336f0dd5cc319aae..030f2d3b8b68b4e7c19f778d529478d4357bdef9 100755 (executable)
   (socket #:accessor irc-bot-socket)
   (actions #:allocation #:each-subclass
            #:init-thunk (build-actions
-                         (*init* irc-bot-init)
-                         (*cleanup* irc-bot-cleanup)
                          (main-loop irc-bot-main-loop)
                          (handle-line handle-line)
                          (send-line irc-bot-send-line-action))))
   (or (slot-ref irc-bot 'realname)
       (irc-bot-username irc-bot)))
 
-(define (irc-bot-init irc-bot message)
+(define-method (actor-init! (irc-bot <irc-bot>))
   "Initialize the IRC bot"
   (define socket
     (irc-socket-setup (irc-bot-server irc-bot)
                       (irc-bot-port irc-bot)))
+  (pk 'initing-irc)
   (set! (irc-bot-socket irc-bot) socket)
   (format socket "USER ~a ~a ~a :~a~a"
           (irc-bot-username irc-bot)
 
   (<- (actor-id irc-bot) 'main-loop))
 
-(define (irc-bot-cleanup irc-bot message)
+(define-method (actor-cleanup! (irc-bot <irc-bot>))
   (close (irc-bot-socket irc-bot)))
 
 (define (irc-bot-main-loop irc-bot message)
    ((eof-object? (peek-char socket))
     (close socket)
     'done)
-   ;; ;; Looks like we've been killed somehow... well, stop running
-   ;; ;; then!
-   ;; ((actor-am-i-dead? irc-bot)
-   ;;  (if (not (port-closed? socket))
-   ;;      (close socket))
-   ;;  'done)
-   ;; Otherwise, let's read till the next line!
    (else
     (<- (actor-id irc-bot) 'main-loop))))
 
index 926d7552f88716fe40e36920a570387406592629..2cbd69572ef55aa0620cbcc801ec479c9fb2d7f9 100644 (file)
@@ -46,21 +46,22 @@ godir=$(libdir)/guile/2.2/ccache
 
 SOURCES =  \
        8sync.scm                                       \
-       8sync/agenda.scm                                \
-       8sync/repl.scm                                  \
        8sync/actors.scm                                \
-       8sync/debug.scm                                 \
-       8sync/ports.scm                                 \
-       8sync/rmeta-slot.scm                            \
-       8sync/contrib/base64.scm                        \
-       8sync/contrib/sha-1.scm                         \
-       8sync/systems/irc.scm                           \
-       8sync/systems/web.scm                           \
-       8sync/systems/websocket.scm                     \
-       8sync/systems/websocket/client.scm              \
-       8sync/systems/websocket/frame.scm               \
-       8sync/systems/websocket/server.scm              \
-       8sync/systems/websocket/utils.scm
+       8sync/inbox.scm
+#      8sync/repl.scm                                  \
+#      8sync/agenda.scm                                \
+#      8sync/debug.scm                                 \
+#      8sync/ports.scm                                 \
+#      8sync/rmeta-slot.scm                            \
+#      8sync/contrib/base64.scm                        \
+#      8sync/contrib/sha-1.scm                         \
+#      8sync/systems/irc.scm                           \
+#      8sync/systems/web.scm                           \
+#      8sync/systems/websocket.scm                     \
+#      8sync/systems/websocket/client.scm              \
+#      8sync/systems/websocket/frame.scm               \
+#      8sync/systems/websocket/server.scm              \
+#      8sync/systems/websocket/utils.scm
 
 TESTS =                                                        \
        tests/test-agenda.scm                           \
index 3bd30a4cfd9f2cc0d141de5ffc96a1fc441ac71a..69f3190faa291ccf43837b624d473ebc98bacb1a 100755 (executable)
@@ -22,8 +22,7 @@
 
 ;; Puppet show simulator.
 
-(use-modules (8sync agenda)
-             (8sync actors)
+(use-modules (8sync actors)
              (oop goops)
              (ice-9 hash-table)
              (ice-9 format))
 (define num-students 10)
 
 (define (main . args)
-  (define agenda (make-agenda))
-  (define hive (make-hive))
-  (define professor (bootstrap-actor* hive <professor> "prof"))
-  (define namegen (student-name-generator))
-  (define students
-    (map
-     (lambda _
-       (let ((name (namegen)))
-         (bootstrap-actor* hive <student> name
-                           #:name name)))
-     (iota num-students)))
-
-  ;; Bootstrap each student into bothering-professor mode.
-  (define start-bothering-tasks
-    (map
-     (lambda (student)
-       (bootstrap-message hive student 'bother-professor
-                               #:target professor))
-     students))
-
-  (run-hive hive start-bothering-tasks))
+  (run-hive
+   (lambda (hive)
+     (define professor (bootstrap-actor* hive <professor> "prof"))
+     (define namegen (student-name-generator))
+     (define students
+       (map
+        (lambda _
+          (let ((name (namegen)))
+            (bootstrap-actor* hive <student> name
+                              #:name name)))
+        (iota num-students)))
+
+     ;; Bootstrap each student into bothering-professor mode.
+     (define start-bothering-tasks
+       (map
+        (lambda (student)
+          (<- student 'bother-professor
+              #:target professor))
+        students))
+
+     (run-hive hive start-bothering-tasks)
+     ;; in other words, this program doesn't really halt
+     (wait (make-condition)))))
index feb8f14bc0a0454ebb48024e98020978abde18c6..52f96e0bf20f6ca72df9cb1679cf3544737c9555 100644 (file)
@@ -35,7 +35,8 @@
 
 (use-modules (8sync actors)
              (oop goops)
-             (ice-9 match))
+             (ice-9 match)
+             (fibers conditions))
 
 (set! *random-state* (random-state-from-platform))
 
    (transmission
     (lambda* (actor message #:key text)
       (display text)
-      (newline)))))
+      (newline)))
+   (done!
+    (lambda* (actor message)
+      (signal-condition! (.done? actor)))))
+  (done? #:init-keyword #:done?
+         #:accessor .done?))
 
 
 ;;; A room full of robots.
     (get-next-room
      (lambda (actor message)
        "Return a reference to the link following this"
-       (<-reply message (slot-ref actor 'next-room))))
+       (slot-ref actor 'next-room)))
 
     (get-previous-room
      (lambda (actor message)
        "Return a reference to the link preceding this"
-       (<-reply message (slot-ref actor 'previous-room))))
+       (slot-ref actor 'previous-room)))
 
     (list-droids
      (lambda (actor message)
        "Return a list of all the droid ids we know of in this room"
-       (<-reply message
-                #:droid-ids (slot-ref actor 'droids))))
+       (slot-ref actor 'droids)))
 
     (register-droid
      (lambda* (actor message #:key droid-id)
     (infection-expose
      (lambda (actor message)
        "Leak whether or not we're infected to a security droid"
-       (<-reply message (slot-ref actor 'infected))))
+       (slot-ref actor 'infected)))
 
     (get-shot
      (lambda (actor message)
               (alive (> new-hp 0)))
          ;; Set our health to the new value
          (slot-set! actor 'hp new-hp)
-         (<-reply message
-                  #:hp-left new-hp
-                  #:damage-taken damage
-                  #:alive alive)
          (when (not alive)
            (format #t "~a: *Kaboom!*\n" (actor-id-actor actor))
-           (self-destruct actor))))))))
+           (self-destruct actor))
+         (values #:hp-left new-hp
+                 #:damage-taken damage
+                 #:alive alive)))))))
 
 
-(define (droid-status-format shot-response)
-  (call-with-message
-   shot-response
-   (lambda* (_ #:key alive damage-taken hp-left)
-     (if alive
-         (format #f "Droid ~a shot; taken ~a damage. Still alive... ~a hp left."
-                 (address-actor-id (message-from shot-response))
-                 damage-taken hp-left)
-         (format #f "Droid ~a shot; taken ~a damage. Terminated."
-                 (address-actor-id (message-from shot-response))
-                 damage-taken)))))
+(define* (droid-status-format droid-id alive damage-taken hp-left)
+  (if alive
+      (format #f "Droid ~a shot; taken ~a damage. Still alive... ~a hp left."
+              (address-actor-id droid-id)
+              damage-taken hp-left)
+      (format #f "Droid ~a shot; taken ~a damage. Terminated."
+              (address-actor-id droid-id)
+              damage-taken)))
 
 
 ;;; Security robot... designed to seek out and destroy infected droids.
                        (address-actor-id room)))
 
     ;; Find all droids in this room and exterminate the infected ones.
-    (mbody-receive (_ #:key list-droids droid-ids #:allow-other-keys)
-        (<-wait room 'list-droids)
+    (let ((droid-ids (<-wait room 'list-droids)))
       (for-each
        (lambda (droid-id)
          (cond
           ;; Looks like it's infected
-          ((mbody-val (<-wait droid-id 'infection-expose))
+          ((<-wait droid-id 'infection-expose)
            ;; Inform that it's infected
            (<- overseer 'transmission
                #:text (format #f "~a found to be infected... taking out"
            ;; Keep firing till it's dead.
            (let ((still-alive #t))
              (while still-alive
-               (mbody-receive (response #:key alive #:allow-other-keys)
-                   (<-wait droid-id 'get-shot)
-                 (<- overseer 'transmission
-                     #:text (droid-status-format response))
-                 (set! still-alive alive)))))
+               (call-with-values
+                   (lambda () (<-wait droid-id 'get-shot))
+                 (lambda* (#:key hp-left damage-taken alive)
+                   (<- overseer 'transmission
+                       #:text (droid-status-format droid-id alive damage-taken hp-left))
+                   (set! still-alive alive))))))
 
           ;; Not infected... inform and go to the next one
           (else
        droid-ids))
 
     ;; Switch to next room, if there is one.
-    (set! room (mbody-val (<-wait room 'get-next-room))))
+    (set! room (<-wait room 'get-next-room)))
 
   ;; Good job everyone!  Shut down the operation.
-  (<- overseer 'transmission
-      #:text "Mission accomplished."))
+  (<-wait overseer 'transmission
+          #:text "Mission accomplished.")
+  (<- overseer 'done!))
 
 (define (main . args)
-  (define hive (make-hive))
-  (define overseer (bootstrap-actor hive <overseer>))
-  (define initial-messages
-    (list (bootstrap-message hive overseer 'init-world)))
-  (run-hive hive initial-messages))
+  (run-hive
+   (lambda (hive)
+     (define done? (make-condition))
+     (define overseer (bootstrap-actor hive <overseer>
+                                       #:done? done?))
+     (<- overseer 'init-world)
+     (wait done?))))
index 2b7f42dc8714a94b7e7ac433f1ec03636694a024..1b649dcbef9bd192dee2a9efdea120d2d2e4cf2d 100644 (file)
@@ -17,7 +17,8 @@
 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
 
 (use-modules (8sync actors)
-             (oop goops))
+             (oop goops)
+             (fibers conditions))
 
 (define-actor <emo> (<actor>)
   ((greet-proog
 (define-actor <proog> (<actor>)
   ((greet-emo
     (lambda (actor message)
-      (display "proog> Listen, Emo!  Listen to the sounds of the machine!\n")))))
+      (display "proog> Listen, Emo!  Listen to the sounds of the machine!\n")
+      (signal-condition! (.done? actor)))))
+  (done? #:init-keyword #:done?
+         #:accessor .done?))
 
-(define hive (make-hive))
-(define our-emo (bootstrap-actor hive <emo>))
-(define our-proog (bootstrap-actor hive <proog>))
 (define (main . args)
-  (run-hive hive
-            (list (bootstrap-message hive our-emo 'greet-proog
-                                     our-proog))))
+  (run-hive
+   (lambda (hive)
+     (define done? (make-condition))
+     (define our-emo (bootstrap-actor hive <emo>))
+     (define our-proog (bootstrap-actor hive <proog>
+                                        #:done? done?))
+     (<- our-emo 'greet-proog our-proog)
+     (wait done?))))
index 77f2db5cebe37901652ef98eb88cf39bf4ea0952..e7026749a2dd0589ea16925c2866f3c221010152 100644 (file)
--- a/guix.scm
+++ b/guix.scm
   (build-system gnu-build-system)
   (native-inputs `(("autoconf" ,autoconf)
                    ("automake" ,automake)
-                   ("guile" ,guile-next)
+                   ("guile" ,guile-2.2)
                    ("pkg-config" ,pkg-config)
                    ("texinfo" ,texinfo)))
+  (propagated-inputs `(("guile-fibers" ,guile-fibers)))
   (arguments
    `(#:phases (modify-phases %standard-phases
                 (add-before 'configure 'bootstrap