X-Git-Url: https://jxself.org/git/?a=blobdiff_plain;f=8sync%2Factors.scm;h=bb4d342a8ff0f49ba5f78ff64c60c52076be767b;hb=c08fc1bf81e422ead4e3394e711ce8d0724559ae;hp=fc8afcad3a6200550ae90396aff05881cd6fff5f;hpb=092b40c1c8c9c45926113fbabd542d3186244367;p=8sync.git diff --git a/8sync/actors.scm b/8sync/actors.scm index fc8afca..bb4d342 100644 --- a/8sync/actors.scm +++ b/8sync/actors.scm @@ -19,6 +19,7 @@ (define-module (8sync actors) #:use-module (oop goops) #:use-module (srfi srfi-9) + #:use-module (srfi srfi-9 gnu) #:use-module (ice-9 control) #:use-module (ice-9 format) #:use-module (ice-9 match) @@ -156,7 +157,7 @@ ;; This is the internal, generalized message sending method. ;; Users shouldn't use it! Use the <-foo forms instead. -(define-inlinable (%<- wants-reply from-actor to action args message-id in-reply-to) +(define (%<- wants-reply from-actor to action args message-id in-reply-to) ;; Okay, we need to deal with message ids. ;; Could we get rid of them? :\ ;; It seems if we can use eq? and have messages be immutable then @@ -164,7 +165,7 @@ ;; If we need to track replies across hive boundaries we could ;; register unique ids across the ambassador barrier. (match to - (#(_ _ (? channel? channel) dead?) + (($
_ _ (? channel? channel) dead?) (let ((message (make-message message-id to (and from-actor (actor-id from-actor)) action args @@ -175,12 +176,18 @@ (put-operation channel message) (wait-operation dead?))))) ;; TODO: put remote addresses here. - (#(actor-id hive-id #f #f) + (($
actor-id hive-id #f #f) ;; Here we'd make a call to our hive... 'TODO) ;; A message sent to nobody goes nowhere. ;; TODO: Should we display a warning here, probably? - (#f #f))) + (#f #f) + ;; We shouldn't technically be passing in actors but rather their + ;; addresses, but often actors want to message themselves and + ;; this makes that slightly easier. + ((? (lambda (x) (is-a? x )) actor) + (%<- wants-reply from-actor (actor-id actor) action + args message-id in-reply-to)))) (define (<- to action . args) (define from-actor (*current-actor*)) @@ -190,16 +197,13 @@ (big-random-number-string)) #f)) -;; TODO: this should abort to the prompt, then check for errors -;; when resuming. - (define (<-wait to action . args) (define prompt (*actor-prompt*)) (when (not prompt) (error "Tried to <-wait without being in an actor's context...")) (let ((reply (abort-to-prompt prompt '<-wait to action args))) - (cond ((eq? action '*error*) + (cond ((eq? (message-action reply) '*error*) (throw 'hive-unresumable-coroutine "Won't resume coroutine; got an *error* as a reply" #:message reply)) @@ -237,10 +241,7 @@ to come after class definition." (live-wrap method)) ...))) (define-class () - ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?) - ;; - inbox-channel is the receiving channel (as opposed to actor-inbox-deq) - ;; - dead? is a fibers condition variable which is set once this actor - ;; kicks the bucket + ;; An
object (id #:init-keyword #:address #:getter actor-id) @@ -281,24 +282,37 @@ to come after class definition." (define-method (actor-cleanup! (actor )) 'no-op) -;;; Addresses are vectors where the first part is the actor-id and -;;; the second part is the hive-id. This works well enough... they -;;; look decent being pretty-printed. - -(define (make-address actor-id hive-id channel dead?) - (vector actor-id hive-id channel dead?)) - -(define (address-actor-id address) - (vector-ref address 0)) - -(define (address-hive-id address) - (vector-ref address 1)) - -(define (address-channel address) - (vector-ref address 2)) - -(define (address-dead? address) - (vector-ref address 3)) +;;; Every actor has an address, which is how its identified. +;;; We also pack in some routing information. +(define-record-type
+ (make-address actor-id hive-id channel dead?) + address? + ;; Unique-to-this-actor-on-this-hive part + (actor-id address-actor-id) + ;; Unique identifier for the hive we're connected to. + ;; If we don't have a "direct" link to the other actor through + ;; a channel, we'll have to look up if our hive has a way to route + ;; to the other hive. + (hive-id address-hive-id) + ;; The receiving channel (as opposed to actor-inbox-deq) + (channel address-channel) + ;; A fibers condition variable which is set once this actor kicks + ;; the bucket + (dead? address-dead?)) + +(set-record-type-printer! +
+ (lambda (address port) + (format port "
" + (address->string address) + (if (address-channel address) + (string-append + ":local" + (if (atomic-box-ref + ((@@ (fibers conditions) condition-signalled?) + (address-dead? address))) + " :dead" "")) + ":remote")))) (define (address->string address) (string-append (address-actor-id address) "@" @@ -309,14 +323,10 @@ to come after class definition." This compares the actor-id and hive-id but ignores the channel and dead? condition." - (match address1 - (#(actor-id-1 hive-id-1 _ _) - (match address2 - (#(actor-id-2 hive-id-2) - (and (equal? actor-id-1 actor-id-2) - (and (equal? hive-id-1 hive-id-2)))) - (_ #f))) - (_ #f))) + (and (equal? (address-actor-id address1) + (address-actor-id address2)) + (equal? (address-hive-id address1) + (address-hive-id address2)))) (define (actor-id-actor actor) "Get the actor id component of the actor-id" @@ -366,10 +376,9 @@ and handling them." (lambda vals ;; Return reply if necessary (when (message-wants-reply message) - (when (message-wants-reply message) - (%<- #f actor (message-from message) '*reply* - vals ((actor-msg-id-generator actor)) - (message-id message))))))) + (%<- #f actor (message-from message) '*reply* + vals ((actor-msg-id-generator actor)) + (message-id message)))))) (const #t) (let ((err (current-error-port))) (lambda (key . args)