X-Git-Url: https://jxself.org/git/?a=blobdiff_plain;f=8sync%2Factors.scm;h=8f1538cd5c6b48c7374ec89a36b050079f038a72;hb=38afa0b278e17953b64764d800beaaa6368f70be;hp=83c6a92a565f66528dc0c3f9ae96f378efeaee36;hpb=b3a48e468f47d5a94c1a38317edce9aa25812f9f;p=8sync.git diff --git a/8sync/actors.scm b/8sync/actors.scm index 83c6a92..8f1538c 100644 --- a/8sync/actors.scm +++ b/8sync/actors.scm @@ -19,6 +19,7 @@ (define-module (8sync actors) #:use-module (oop goops) #:use-module (srfi srfi-9) + #:use-module (srfi srfi-9 gnu) #:use-module (ice-9 control) #:use-module (ice-9 format) #:use-module (ice-9 match) @@ -32,7 +33,6 @@ #:use-module (fibers channels) #:use-module (fibers conditions) #:use-module (fibers operations) - #:use-module (fibers internal) #:use-module (8sync inbox) #:use-module (8sync rmeta-slot) @@ -44,6 +44,8 @@ actor-id actor-message-handler + *current-actor* + ;;; Commenting out the
type for now; ;;; it may be back when we have better serializers ;;
@@ -57,8 +59,6 @@ actor-init! actor-cleanup! - actor-alive? - build-actions define-actor @@ -82,7 +82,13 @@ <- <-wait - spawn-hive run-hive)) + spawn-hive run-hive + + ;; Maybe the wrong place for this, or for it to be exported. + ;; But it's used in websockets' server implementation at least... + live-wrap + + *debug-actor-ids*)) ;; For ids (set! *random-state* (random-state-from-platform)) @@ -153,7 +159,7 @@ ;; This is the internal, generalized message sending method. ;; Users shouldn't use it! Use the <-foo forms instead. -(define-inlinable (%<- wants-reply from-actor to action args message-id in-reply-to) +(define (%<- wants-reply from-actor to action args message-id in-reply-to) ;; Okay, we need to deal with message ids. ;; Could we get rid of them? :\ ;; It seems if we can use eq? and have messages be immutable then @@ -161,7 +167,7 @@ ;; If we need to track replies across hive boundaries we could ;; register unique ids across the ambassador barrier. (match to - (#(_ _ (? channel? channel) dead?) + (($
_ _ (? channel? channel) dead?) (let ((message (make-message message-id to (and from-actor (actor-id from-actor)) action args @@ -172,12 +178,18 @@ (put-operation channel message) (wait-operation dead?))))) ;; TODO: put remote addresses here. - (#(actor-id hive-id #f #f) + (($
actor-id hive-id #f #f) ;; Here we'd make a call to our hive... 'TODO) ;; A message sent to nobody goes nowhere. ;; TODO: Should we display a warning here, probably? - (#f #f))) + (#f #f) + ;; We shouldn't technically be passing in actors but rather their + ;; addresses, but often actors want to message themselves and + ;; this makes that slightly easier. + ((? (lambda (x) (is-a? x )) actor) + (%<- wants-reply from-actor (actor-id actor) action + args message-id in-reply-to)))) (define (<- to action . args) (define from-actor (*current-actor*)) @@ -187,16 +199,13 @@ (big-random-number-string)) #f)) -;; TODO: this should abort to the prompt, then check for errors -;; when resuming. - (define (<-wait to action . args) (define prompt (*actor-prompt*)) (when (not prompt) (error "Tried to <-wait without being in an actor's context...")) (let ((reply (abort-to-prompt prompt '<-wait to action args))) - (cond ((eq? action '*error*) + (cond ((eq? (message-action reply) '*error*) (throw 'hive-unresumable-coroutine "Won't resume coroutine; got an *error* as a reply" #:message reply)) @@ -220,24 +229,21 @@ #:message message)) (apply method actor message (message-body message))) -(define-syntax-rule (wrap-apply body) +(define-syntax-rule (live-wrap body) "Wrap possibly multi-value function in a procedure, applies all arguments" (lambda args (apply body args))) (define-syntax-rule (build-actions (symbol method) ...) "Construct an alist of (symbol . method), where the method is wrapped -with wrap-apply to facilitate live hacking and allow the method definition +with `live-wrap' to facilitate live hacking and allow the method definition to come after class definition." (build-rmeta-slot (list (cons (quote symbol) - (wrap-apply method)) ...))) + (live-wrap method)) ...))) (define-class () - ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?) - ;; - inbox-channel is the receiving channel (as opposed to actor-inbox-deq) - ;; - dead? is a fibers condition variable which is set once this actor - ;; kicks the bucket + ;; An
object (id #:init-keyword #:address #:getter actor-id) @@ -278,24 +284,37 @@ to come after class definition." (define-method (actor-cleanup! (actor )) 'no-op) -;;; Addresses are vectors where the first part is the actor-id and -;;; the second part is the hive-id. This works well enough... they -;;; look decent being pretty-printed. - -(define (make-address actor-id hive-id channel dead?) - (vector actor-id hive-id channel dead?)) - -(define (address-actor-id address) - (vector-ref address 0)) - -(define (address-hive-id address) - (vector-ref address 1)) - -(define (address-channel address) - (vector-ref address 2)) - -(define (address-dead? address) - (vector-ref address 3)) +;;; Every actor has an address, which is how its identified. +;;; We also pack in some routing information. +(define-record-type
+ (make-address actor-id hive-id channel dead?) + address? + ;; Unique-to-this-actor-on-this-hive part + (actor-id address-actor-id) + ;; Unique identifier for the hive we're connected to. + ;; If we don't have a "direct" link to the other actor through + ;; a channel, we'll have to look up if our hive has a way to route + ;; to the other hive. + (hive-id address-hive-id) + ;; The receiving channel (as opposed to actor-inbox-deq) + (channel address-channel) + ;; A fibers condition variable which is set once this actor kicks + ;; the bucket + (dead? address-dead?)) + +(set-record-type-printer! +
+ (lambda (address port) + (format port "
" + (address->string address) + (if (address-channel address) + (string-append + ":local" + (if (atomic-box-ref + ((@@ (fibers conditions) condition-signalled?) + (address-dead? address))) + " :dead" "")) + ":remote")))) (define (address->string address) (string-append (address-actor-id address) "@" @@ -306,14 +325,10 @@ to come after class definition." This compares the actor-id and hive-id but ignores the channel and dead? condition." - (match address1 - (#(actor-id-1 hive-id-1 _ _) - (match address2 - (#(actor-id-2 hive-id-2) - (and (equal? actor-id-1 actor-id-2) - (and (equal? hive-id-1 hive-id-2)))) - (_ #f))) - (_ #f))) + (and (equal? (address-actor-id address1) + (address-actor-id address2)) + (equal? (address-hive-id address1) + (address-hive-id address2)))) (define (actor-id-actor actor) "Get the actor id component of the actor-id" @@ -363,10 +378,9 @@ and handling them." (lambda vals ;; Return reply if necessary (when (message-wants-reply message) - (when (message-wants-reply message) - (%<- #f actor (message-from message) '*reply* - vals ((actor-msg-id-generator actor)) - (message-id message))))))) + (%<- #f actor (message-from message) '*reply* + vals ((actor-msg-id-generator actor)) + (message-id message)))))) (const #t) (let ((err (current-error-port))) (lambda (key . args) @@ -459,16 +473,14 @@ and handling them." ;; spawn a fiber that wakes up a thunk on the actor when its port is ;; available. Funky... -(define (%suspend-io-to-actor resume-method get-wait-fd-method) +(define (%suspend-io-to-actor wait-for-read/write) (lambda (port) (define prompt (*actor-prompt*)) (define resume-channel (*resume-io-channel*)) (define (run-at-prompt k) (spawn-fiber (lambda () - (suspend-current-fiber - (lambda (fiber) - (resume-on-readable-fd (port-read-wait-fd port) fiber))) + (wait-for-read/write port) ;; okay, we're awake again, tell the actor to resume this ;; continuation (put-message resume-channel k)) @@ -479,10 +491,10 @@ and handling them." 'run-me run-at-prompt))) (define suspend-read-to-actor - (%suspend-io-to-actor resume-on-readable-fd port-read-wait-fd)) + (%suspend-io-to-actor (@@ (fibers) wait-for-readable))) (define suspend-write-to-actor - (%suspend-io-to-actor resume-on-writable-fd port-write-wait-fd)) + (%suspend-io-to-actor (@@ (fibers) wait-for-writable))) (define (with-actor-nonblocking-ports thunk) "Runs THUNK in dynamic context in which attempting to read/write @@ -642,14 +654,22 @@ to spawn-hive... all remaining arguments passed to run-fibers." ;; return the address address)) +;;; Whether or not to attach the class' name as a cookie by default in +;;; create-actor +(define *debug-actor-ids* + (make-parameter #t)) + (define* (create-actor actor-class #:rest init-args) "Create an instance of actor-class. Return the new actor's id. This is the method actors should call directly (unless they want to supply an id-cookie, in which case they should use create-actor*)." - (%create-actor actor-class init-args #f #t)) - + (%create-actor actor-class init-args + (if (*debug-actor-ids*) + (symbol->string (class-name actor-class)) + #f) + #t)) (define* (create-actor* actor-class id-cookie #:rest init-args) "Create an instance of actor-class. Return the new actor's id. @@ -657,11 +677,10 @@ create-actor*)." Like create-actor, but permits supplying an id-cookie." (%create-actor actor-class init-args id-cookie #t)) -(define* (self-destruct actor #:key (cleanup #t)) +(define (self-destruct actor) "Remove an actor from the hive. -Unless #:cleanup is set to #f, this will first have the actor handle -its '*cleanup* action handler." +The actor will also call its `actor-cleanup!' method." (signal-condition! (address-dead? (actor-id actor))) (put-message (*hive-channel*) (list 'remove-actor (actor-id-actor actor))) ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending @@ -669,14 +688,3 @@ its '*cleanup* action handler." (*actor-prompt* #f) (actor-cleanup! actor)) -;; From a patch I sent to Fibers... -(define (condition-signalled? cvar) - "Return @code{#t} if @var{cvar} has already been signalled. - -In general you will want to use @code{wait} or @code{wait-operation} to -wait on a condition. However, sometimes it is useful to see whether or -not a condition has already been signalled without blocking." - (atomic-box-ref ((@@ (fibers conditions) condition-signalled?) cvar))) - -(define (actor-alive? actor) - (condition-signalled? (address-dead? (actor-id actor))))