From: Christopher Allan Webber Date: Sat, 16 Apr 2016 22:17:28 +0000 (-0500) Subject: systems: actors: New actor model system. X-Git-Tag: v0.2.0~68 X-Git-Url: https://jxself.org/git/?p=8sync.git;a=commitdiff_plain;h=70b92d795a14d1328b90dd06f8c618b2ea09332d;hp=c1c7225a35511bbfd2362ca0d5359fb2f1fbb8f6 systems: actors: New actor model system. * 8sync/systems/actors.scm: New file, containing actor model system. --- diff --git a/8sync/systems/actors.scm b/8sync/systems/actors.scm new file mode 100644 index 0000000..8f15feb --- /dev/null +++ b/8sync/systems/actors.scm @@ -0,0 +1,542 @@ +;;; 8sync --- Asynchronous programming for Guile +;;; Copyright (C) 2016 Christopher Allan Webber +;;; +;;; This file is part of 8sync. +;;; +;;; 8sync is free software: you can redistribute it and/or modify it +;;; under the terms of the GNU Lesser General Public License as +;;; published by the Free Software Foundation, either version 3 of the +;;; License, or (at your option) any later version. +;;; +;;; 8sync is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;;; GNU Lesser General Public License for more details. +;;; +;;; You should have received a copy of the GNU Lesser General Public +;;; License along with 8sync. If not, see . + +;; XUDD inspired actor system + +(define-module (8sync systems actors) + #:use-module (oop goops) + #:use-module (srfi srfi-9) + #:use-module (srfi srfi-9 gnu) + #:use-module (ice-9 format) + #:use-module (ice-9 match) + #:use-module (8sync agenda) + #:use-module (8sync repl) + #:export (;; utilities... ought to go in their own module + big-random-number + big-random-number-string + simple-message-id-generator + require-slot + + + actor-id + actor-hive + actor-message-handler + +
+ make-address address? + address-actor-id address-hive-id + + address->string + actor-id-actor + actor-id-hive + actor-id-string + + make-action-dispatch + define-simple-actor + + + make-hive + ;; There are more methods for the hive, but there's + ;; no reason for the outside world to look at them maybe? + hive-id + hive-create-actor hive-create-actor* + + + make-message message? + message-to message-action message-from + message-id message-body message-in-reply-to + message-wants-reply + message-ref + + send-message send-message-wait + reply-message reply-message-wait + + ez-run-hive + hive-bootstrap-message)) + +;; For ids +(define %random-state + (make-parameter (random-state-from-platform))) + +;; Probably bigger than necessary +(define random-number-size (expt 10 50)) + +(define (big-random-number) + (random random-number-size (%random-state))) + +;; Would be great to get this base64 encoded instead. +(define (big-random-number-string) + ;; @@: This is slow. Using format here is wasteful. + (format #f "~x" (big-random-number))) + +;; @@: This is slow. A mere ~275k / second on my (old) machine. +;; The main cost seems to be in number->string. +(define (simple-message-id-generator) + ;; Prepending this cookie makes message ids unique per hive + (let ((prefix (format #f "~x:" (big-random-number))) + (counter 0)) + (lambda () + (set! counter (1+ counter)) + (string-append prefix (number->string counter))))) + +(define (require-slot slot-name) + "Generate something for #:init-thunk to complain about unfilled slot" + (lambda () + (throw 'required-slot + (format #f "Slot ~s not filled" slot-name) + slot-name))) + + + +;;; Main actor implementation +;;; ========================= + +(define-class () + ;; An
object + (id #:init-thunk (require-slot "id") + #:init-keyword #:id + #:getter actor-id) + ;; The hive we're connected to. + ;; We need this to be able to send messages. + (hive #:init-thunk (require-slot "hive") + #:init-keyword #:hive + #:accessor actor-hive) + ;; How we receive and process new messages + (message-handler #:init-thunk (require-slot "message-handler") + #:allocation #:each-subclass)) + +(define-method (actor-message-handler (actor )) + (slot-ref actor 'message-handler)) + +(define-record-type
+ (make-address actor-id hive-id) ; @@: Do we want the trailing -id? + address? + (actor-id address-actor-id) + (hive-id address-hive-id)) + +;; (set-record-type-printer! +;;
+;; (lambda (record port) +;; (format port "<
~s>" (address->string record)))) + +(define (address->string address) + (string-append (address-actor-id address) "@" + (address-hive-id address))) + +(define-method (actor-id-actor (actor )) + "Get the actor id component of the actor-id" + (address-actor-id (actor-id actor))) + +(define-method (actor-id-hive (actor )) + "Get the hive id component of the actor-id" + (address-hive-id (actor-id actor))) + +(define-method (actor-id-string (actor )) + "Render the full actor id as a human-readable string" + (address->string (actor-id actor))) + + + +;;; Actor utilities +;;; =============== + +(define (simple-dispatcher action-map) + (lambda (actor message) + (let* ((action (message-action message)) + (method (assoc-ref action-map action))) + (if (not method) + (throw 'action-not-found + "No appropriate action handler found for actor" + #:action action + #:actor actor + #:message message + #:available-actions (map car action-map))) + (method actor message)))) + +(define-syntax %expand-action-item + (syntax-rules () + ((_ ((action-name action-args ...) body ...)) + (cons (quote action-name) + (lambda (action-args ...) + body ...))) + ((_ (action-name handler)) + (cons (quote action-name) handler)))) + +(define-syntax make-action-dispatch + (syntax-rules () + "Expand a list of action names and actions into an alist + +You can use this like the following: + (make-action-dispatch + (cookies + (lambda (actor message) + (display \"I love cookies!\n\"))) + (party + (lambda (actor message) + (display \"Life of the party!\")))) + +Alternately, if you'd like to skip the lambda, you could use the slightly +more compact following syntax: + (make-action-dispatch + ((cookies actor message) + (display \"I love cookies!\n\")) + ((party actor message) + (display \"Life of the party!\")))" + ((make-action-dispatch action-item ...) + (simple-dispatcher + (list (%expand-action-item action-item) ...))))) + +(define-syntax-rule (define-simple-actor class (actions ...)) + (define-class class () + (message-handler + #:init-value (make-action-dispatch actions ...) + #:allocation #:each-subclass))) + + +;;; The Hive +;;; ======== +;;; Every actor has a hive. The hive is a kind of "meta-actor" +;;; which routes all the rest of the actors in a system. + +(define-generic hive-handle-failed-forward) + +(define-class () + ;; This gets set to itself immediately after being created + (hive #:init-value #f) + (actor-registry #:init-thunk make-hash-table + #:getter hive-actor-registry) + (msg-id-generator #:init-thunk simple-message-id-generator + #:getter hive-msg-id-generator) + ;; Ambassadors are used (or will be) for inter-hive communication. + ;; These are special actors that know how to route messages to other hives. + (ambassadors #:init-thunk make-weak-key-hash-table + #:getter hive-ambassadors) + ;; Waiting coroutines + ;; This is a map from cons cell of message-id + ;; to a cons cell of (actor-id . coroutine) + ;; @@: Should we have a record type? + (waiting-coroutines #:init-thunk make-hash-table + #:getter hive-waiting-coroutines) + ;; Message prompt + ;; When actors send messages to each other they abort to this prompt + ;; to send the message, then carry on their way + (prompt #:init-thunk make-prompt-tag + #:getter hive-prompt) + (message-handler + #:init-value + (make-action-dispatch + ;; This is in the case of an ambassador failing to forward a message... + ;; it reports it back to the hive + (*failed-forward* hive-handle-failed-forward)))) + +(define-method (hive-handle-failed-forward (hive ) message) + "Handle an ambassador failing to forward a message" + 'TODO) + +(define* (make-hive #:key hive-id) + (let ((hive (make + #:id (make-address + "hive" (or hive-id + (big-random-number-string)))))) + ;; Set the hive's actor reference to itself + (set! (actor-hive hive) hive) + hive)) + +(define-method (hive-id (hive )) + (actor-id-hive hive)) + +(define-method (hive-gen-actor-id (hive ) cookie) + (make-address (if cookie + (string-append cookie "-" (big-random-number-string)) + (big-random-number-string)) + (hive-id hive))) + +(define-method (hive-gen-message-id (hive )) + "Generate a message id using HIVE's message id generator" + ((hive-msg-id-generator hive))) + +(define-method (hive-resolve-local-actor (hive ) actor-address) + (hash-ref (hive-actor-registry hive) actor-address)) + +(define-method (hive-resolve-ambassador (hive ) ambassador-address) + (hash-ref (hive-ambassadors hive) ambassador-address)) + +(define-method (make-forward-request (hive ) (ambassador ) message) + (make-message (hive-gen-message-id hive) (actor-id ambassador) + ;; If we make the hive not an actor, we could either switch this + ;; to #f or to the original actor...? + ;; Maybe some more thinking should be done on what should + ;; happen in case of failure to forward? Handling ambassador failures + ;; seems like the primary motivation for the hive remaining an actor. + (actor-id hive) + '*forward* + `((original . ,message)))) + +(define-method (hive-process-message (hive ) message) + "Handle one message, or forward it via an ambassador" + (define (process-local-message) + (let ((actor (hive-resolve-local-actor hive (message-to message)))) + (if (not actor) + (throw 'actor-not-found + (format #f "Message ~a from ~a directed to nonexistant actor ~a" + (message-id message) + (address->string (message-from message)) + (address->string (message-to message))) + message)) + (call-with-prompt (hive-prompt hive) + (lambda () + (define message-handler (actor-message-handler actor)) + ;; @@: Should a more general error handling happen here? + (message-handler actor message)) + + (lambda (kont actor message) + (let ((hive (actor-hive actor))) + ;; Register the coroutine + (hash-set! (hive-waiting-coroutines hive) + (message-id message) + (cons (actor-id actor) kont)) + ;; Send off the message + (8sync (hive-process-message hive message))))))) + + (define (resume-waiting-coroutine) + (match (hash-remove! (hive-waiting-coroutines hive) + (message-in-reply-to message)) + ((_ . kont) + (kont message)) + (#f (throw 'no-waiting-coroutine + "message in-reply-to tries to resume nonexistent coroutine" + message)))) + + (define (process-remote-message) + ;; Find the ambassador + (let* ((remote-hive-id (hive-id (message-to message))) + (ambassador (hive-resolve-ambassador remote-hive-id)) + (message-handler (actor-message-handler ambassador)) + (forward-request (make-forward-request hive ambassador message))) + (message-handler ambassador forward-request))) + + (let ((to (message-to message))) + ;; This seems to be an easy mistake to make, so check that addressing + ;; is correct here + (if (not to) + (throw 'missing-addressee + "`to' field is missing on message" + #:message message)) + (if (hive-actor-local? hive to) + (if (message-in-reply-to message) + (resume-waiting-coroutine) + (process-local-message)) + (process-remote-message)))) + +(define-method (hive-actor-local? (hive ) address) + (hash-ref (hive-actor-registry hive) address)) + +(define-method (hive-register-actor! (hive ) (actor )) + (hash-set! (hive-actor-registry hive) (actor-id actor) actor)) + +(define-method (%hive-create-actor (hive ) actor-class + init id-cookie) + "Actual method called by hive-create-actor. + +Since this is a define-method it can't accept fancy define* arguments, +so this gets called from the nicer hive-create-actor interface. See +that method for documentation." + (let* ((actor-id (hive-gen-actor-id hive id-cookie)) + (actor (apply make actor-class + ;; @@: If we switch to a hive-proxy, do it here + #:hive hive + #:id actor-id + init))) + (hive-register-actor! hive actor) + ;; return the actor id + actor-id)) + +(define* (hive-create-actor hive actor-class + #:key + (init '()) + id-cookie) + (%hive-create-actor hive actor-class + init id-cookie)) + +(define-syntax hive-create-actor* + (syntax-rules () + "Create an instance of actor-class attached to this hive. +Return the new actor's id. + +Used internally, and used for bootstrapping a fresh hive. + +Note that actors should generally not call this method directly. +Instead, actors should call create-actor." + ((_ args ... (init-args ...)) + (hive-create-actor args ... + #:init (list init-args ...))))) + + +;; TODO: Give actors this instead of the actual hive reference +(define-class () + (send-message #:getter proxy-send-message + #:init-keyword #:send-message) + (create-actor #:getter proxy-create-actor + #:init-keyword #:create-actor)) + +;; Live the hive proxy, but has access to the hive itself... +(define-class () + (hive #:init-keyword #:hive)) + + + +;;; Messages +;;; ======== + + +(define-record-type + (make-message-intern id to from action + body in-reply-to wants-reply ; do we need hive-proxy? + ;; Are these still needed? + replied deferred-reply) + message? + (id message-id) + (to message-to) + (from message-from) + (action message-action) + (body message-body) + (in-reply-to message-in-reply-to) + (wants-reply message-wants-reply) + + ;; See XUDD source for these. Not use yet, maybe eventually will be? + ;; XUDD uses them for autoreply. + ;; Requiring mutation on message objects is clearly not great, + ;; but it may be worth it...? Investigate! + (replied message-replied set-message-replied!) + (deferred-reply deferred-reply set-message-deferred-reply!)) + + +(define* (make-message id to from action body + #:key in-reply-to wants-reply + replied deferred-reply) + (make-message-intern id to from action body + in-reply-to wants-reply replied + deferred-reply)) + +;; Note: the body of messages is currently an alist, but it's created +;; from a keyword based property list (see the following two functions). +;; But, that's an extra conversion step, and maybe totally unnecessary: +;; we already have message-ref, and this could just pull a keyword +;; from a property list. +;; The main ways this might be useful are error checking, +;; serialization across the wire (but even that might require some +;; change), and using existing tooling (though adding new tooling +;; would be negligible in implementation effort.) + +(define* (message-ref message key #:optional dflt) + "Extract KEY from body of MESSAGE. + +Optionally set default with [DFLT]" + (let ((result (assoc key (message-body message)))) + (if result (cdr result) + dflt))) + + +(define (kwarg-list-to-alist args) + (let loop ((remaining args) + (result '())) + (match remaining + (((? keyword? key) val rest ...) + (loop rest + (cons (cons (keyword->symbol key) val) + result))) + (() result) + (_ (throw 'invalid-kwarg-list + "Invalid keyword argument list" + args))))) + + +(define (send-message from-actor to-id action . message-body-args) + "Send a message from an actor to another actor" + (let* ((hive (actor-hive from-actor)) + (message (make-message (hive-gen-message-id hive) to-id + (actor-id from-actor) action + (kwarg-list-to-alist message-body-args)))) + (8sync (hive-process-message hive message)))) + +(define (send-message-wait from-actor to-id action . message-body-args) + "Send a message from an actor to another, but wait until we get a response" + (let* ((hive (actor-hive from-actor)) + (agenda-prompt (hive-prompt (actor-hive from-actor))) + (message (make-message (hive-gen-message-id hive) to-id + (actor-id from-actor) action + (kwarg-list-to-alist message-body-args) + #:wants-reply #t))) + (abort-to-prompt agenda-prompt from-actor message))) + +;; TODO: Intelligently ~propagate(ish) errors on -wait functions. +;; We might have `send-message-wait-brazen' to allow callers to +;; not have an exception thrown and instead just have a message with +;; the appropriate '*error* message returned. + +(define (reply-message from-actor original-message + . message-body-args) + "Reply to a message" + (set-message-replied! original-message #t) + (let* ((hive (actor-hive from-actor)) + (new-message (make-message (hive-gen-message-id hive) + (message-from original-message) + (actor-id from-actor) '*reply* + (kwarg-list-to-alist message-body-args) + #:in-reply-to (message-id original-message)))) + (8sync (hive-process-message hive new-message)))) + +(define (reply-message-wait from-actor original-message + . message-body-args) + "Reply to a messsage, but wait until we get a response" + (set-message-replied! original-message #t) + (let* ((hive (actor-hive from-actor)) + (agenda-prompt (hive-prompt (actor-hive from-actor))) + (new-message (make-message (hive-gen-message-id hive) + (message-from original-message) + (actor-id from-actor) '*reply* + (kwarg-list-to-alist message-body-args) + #:wants-reply #t + #:in-reply-to (message-id original-message)))) + (abort-to-prompt agenda-prompt from-actor new-message))) + + + +;;; 8sync bootstrap utilities +;;; ========================= + +(define* (ez-run-hive hive initial-tasks #:key repl-server) + "Start up an agenda and run HIVE in it with INITIAL-TASKS. + +Should we start up a cooperative REPL for live hacking? REPL-SERVER +wants to know! You can pass it #t or #f, or if you want to specify a port, +an integer." + (let* ((queue (list->q initial-tasks)) + (agenda (make-agenda #:pre-unwind-handler print-error-and-continue + #:queue queue))) + (cond + ;; If repl-server is an integer, we'll use that as the port + ((integer? repl-server) + (spawn-and-queue-repl-server! agenda repl-server)) + (repl-server + (spawn-and-queue-repl-server! agenda))) + (start-agenda agenda))) + +(define (hive-bootstrap-message hive to-id action . message-body-args) + (wrap + (apply send-message hive to-id action message-body-args)))