Remove (8sync agenda) as an import from actors.scm and add wrap-apply.
[8sync.git] / 8sync / actors.scm
1 ;;; 8sync --- Asynchronous programming for Guile
2 ;;; Copyright © 2016, 2017 Christopher Allan Webber <cwebber@dustycloud.org>
3 ;;;
4 ;;; This file is part of 8sync.
5 ;;;
6 ;;; 8sync is free software: you can redistribute it and/or modify it
7 ;;; under the terms of the GNU Lesser General Public License as
8 ;;; published by the Free Software Foundation, either version 3 of the
9 ;;; License, or (at your option) any later version.
10 ;;;
11 ;;; 8sync is distributed in the hope that it will be useful,
12 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ;;; GNU Lesser General Public License for more details.
15 ;;;
16 ;;; You should have received a copy of the GNU Lesser General Public
17 ;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
18
19 (define-module (8sync actors)
20   #:use-module (oop goops)
21   #:use-module (srfi srfi-9)
22   #:use-module (ice-9 control)
23   #:use-module (ice-9 format)
24   #:use-module (ice-9 match)
25   #:use-module (ice-9 atomic)
26   #:use-module (ice-9 pretty-print)
27   #:use-module (ice-9 receive)
28   #:use-module (fibers)
29   #:use-module (fibers channels)
30   #:use-module (fibers conditions)
31   #:use-module (fibers operations)
32   #:use-module (8sync inbox)
33   #:use-module (8sync rmeta-slot)
34   #:export (;; utilities... ought to go in their own module
35             big-random-number
36             big-random-number-string
37
38             <actor>
39             actor-id
40             actor-message-handler
41
42             ;;; Commenting out the <address> type for now;
43             ;;; it may be back when we have better serializers
44             ;; <address>
45             make-address
46             address-actor-id address-hive-id
47
48             address->string
49             actor-id-actor
50             actor-id-hive
51             actor-id-string
52
53             actor-init! actor-cleanup!
54
55             actor-alive?
56
57             build-actions
58
59             define-actor
60
61             ;; <hive>
62             ;; make-hive
63             ;; ;; There are more methods for the hive, but there's
64             ;; ;; no reason for the outside world to look at them maybe?
65             ;; hive-id
66             bootstrap-actor bootstrap-actor*
67
68             create-actor create-actor*
69             self-destruct
70
71             <message>
72             make-message message?
73             message-to message-action message-from
74             message-id message-body message-in-reply-to
75             message-wants-reply
76
77             <- <-wait
78
79             spawn-hive run-hive))
80
81 ;; For ids
82 (set! *random-state* (random-state-from-platform))
83
84 ;; Same size as a uuid4 I think...
85 (define random-number-size (expt 2 128))
86
87 (define (big-random-number)
88   (random random-number-size))
89
90 ;; Would be great to get this base64 encoded instead.
91 (define (big-random-number-string)
92   ;; @@: This is slow.  Using format here is wasteful.
93   (format #f "~x" (big-random-number)))
94
95 ;; @@: This is slow-ish.  A mere ~275k / second on my (old) machine.
96 ;;   The main cost seems to be in number->string.
97 (define (simple-message-id-generator)
98   ;; Prepending this cookie makes message ids unique per hive
99   (let ((prefix (format #f "~x:" (big-random-number)))
100         (counter 0))
101     (lambda ()
102       (set! counter (1+ counter))
103       (string-append prefix (number->string counter)))))
104
105
106 \f
107 ;;; Messages
108 ;;; ========
109
110 (define-record-type <message>
111   (make-message-intern id to from action
112                        body in-reply-to wants-reply)
113   message?
114   ;; @@: message-ids are removed.  They could be re-enabled
115   ;;   if we had thread-safe promises...
116   (id message-id)                    ; id of this message
117   (to message-to)                    ; actor id this is going to
118   (from message-from)                ; actor id of sender
119   (action message-action)            ; action (a symbol) to be handled
120   (body message-body)                ; argument list "body" of message
121   (in-reply-to message-in-reply-to)  ; message id this is in reply to, if any
122   (wants-reply message-wants-reply)) ; whether caller is waiting for reply
123
124
125 (define* (make-message id to from action body
126                        #:key in-reply-to wants-reply)
127   (make-message-intern id to from action body
128                        in-reply-to wants-reply))
129
130 (define (kwarg-list-to-alist args)
131   (let loop ((remaining args)
132              (result '()))
133     (match remaining
134       (((? keyword? key) val rest ...)
135        (loop rest
136              (cons (cons (keyword->symbol key) val) 
137                    result)))
138       (() result)
139       (_ (throw 'invalid-kwarg-list
140                 "Invalid keyword argument list"
141                 args)))))
142
143
144 ;;; See: https://web.archive.org/web/20081223021934/http://mumble.net/~jar/articles/oo-moon-weinreb.html
145 ;;;   (also worth seeing: http://mumble.net/~jar/articles/oo.html )
146
147 ;; This is the internal, generalized message sending method.
148 ;; Users shouldn't use it!  Use the <-foo forms instead.
149
150 (define-inlinable (%<- wants-reply from-actor to action args message-id in-reply-to)
151   ;; Okay, we need to deal with message ids.
152   ;; Could we get rid of them? :\
153   ;; It seems if we can use eq? and have messages be immutable then
154   ;; it should be possible to identify follow-up replies.
155   ;; If we need to track replies across hive boundaries we could
156   ;; register unique ids across the ambassador barrier.
157   (match to
158     (#(_ _ (? channel? channel) dead?)
159      (let ((message (make-message message-id to
160                                   (and from-actor (actor-id from-actor))
161                                   action args
162                                   #:wants-reply wants-reply
163                                   #:in-reply-to in-reply-to)))
164        (perform-operation
165         (choice-operation
166          (put-operation channel message)
167          (wait-operation dead?)))))
168     ;; TODO: put remote addresses here.
169     (#(actor-id hive-id #f #f)
170      ;; Here we'd make a call to our hive...
171      'TODO)
172     ;; A message sent to nobody goes nowhere.
173     ;; TODO: Should we display a warning here, probably?
174     (#f #f)))
175
176 (define (<- to action . args)
177   (define from-actor (*current-actor*))
178   (%<- #f from-actor to action args
179        (or (and from-actor
180                 ((actor-msg-id-generator from-actor)))
181            (big-random-number-string))
182        #f))
183
184 ;; TODO: this should abort to the prompt, then check for errors
185 ;;   when resuming.
186
187 (define (<-wait to action . args)
188   (define prompt (*actor-prompt*))
189   (when (not prompt)
190     (error "Tried to <-wait without being in an actor's context..."))
191
192   (let ((reply (abort-to-prompt prompt to action args)))
193     (cond ((eq? action '*error*)
194            (throw 'hive-unresumable-coroutine
195                   "Won't resume coroutine; got an *error* as a reply"
196                   #:message reply))
197           (else (apply values (message-body reply))))))
198
199 \f
200 ;;; Main actor implementation
201 ;;; =========================
202
203 (define (actor-inheritable-message-handler actor message)
204   (define action (message-action message))
205   (define method
206     (class-rmeta-ref (class-of actor) 'actions action
207                      #:equals? eq? #:cache-set! hashq-set!
208                      #:cache-ref hashq-ref))
209   (unless method
210     (throw 'action-not-found
211            "No appropriate action handler found for actor"
212            #:action action
213            #:actor actor
214            #:message message))
215   (apply method actor message (message-body message)))
216
217 (define-syntax-rule (wrap-apply body)
218   "Wrap possibly multi-value function in a procedure, applies all arguments"
219   (lambda args
220     (apply body args)))
221
222 (define-syntax-rule (build-actions (symbol method) ...)
223   "Construct an alist of (symbol . method), where the method is wrapped
224 with wrap-apply to facilitate live hacking and allow the method definition
225 to come after class definition."
226   (build-rmeta-slot
227    (list (cons (quote symbol)
228                (wrap-apply method)) ...)))
229
230 (define-class <actor> ()
231   ;; An address object... a vector of #(actor-id hive-id inbox-channel dead?)
232   ;;  - inbox-channel is the receiving channel (as opposed to actor-inbox-deq)
233   ;;  - dead? is a fibers condition variable which is set once this actor
234   ;;    kicks the bucket
235   (id #:init-keyword #:address
236       #:getter actor-id)
237   ;; The connection to the hive we're connected to.
238   (hive-channel #:init-keyword #:hive-channel
239                 #:accessor actor-hive-channel)
240
241   ;; Our queue to send/receive messages on
242   (inbox-deq #:init-thunk make-channel
243              #:accessor actor-inbox-deq)
244
245   (msg-id-generator #:init-thunk simple-message-id-generator
246                     #:getter actor-msg-id-generator)
247
248   ;; How we receive and process new messages
249   (message-handler #:init-value actor-inheritable-message-handler
250                    ;; @@: There's no reason not to use #:class instead of
251                    ;;   #:each-subclass anywhere in this file, except for
252                    ;;   Guile bug #25211 (#:class is broken in Guile 2.2)
253                    #:allocation #:each-subclass
254                    #:getter actor-message-handler)
255
256   ;; valid values are:
257   ;;  - #t as in, send the init message, but don't wait (default)
258   ;;  - 'wait, as in wait on the init message
259   ;;  - #f as in don't bother to init
260   (should-init #:init-value #t
261                #:getter actor-should-init
262                #:allocation #:each-subclass)
263
264   ;; This is the default, "simple" way to inherit and process messages.
265   (actions #:init-thunk (build-actions)
266            #:allocation #:each-subclass))
267
268 ;;; Actors may specify an "init" action that occurs before the actor
269 ;;; actually begins to run.
270 ;;; During actor-init!, an actor may send a message to itself or others
271 ;;; via <- but *may not* use <-wait.
272 (define-method (actor-init! (actor <actor>))
273   'no-op)
274
275 (define-method (actor-cleanup! (actor <actor>))
276   'no-op)
277
278 ;;; Addresses are vectors where the first part is the actor-id and
279 ;;; the second part is the hive-id.  This works well enough... they
280 ;;; look decent being pretty-printed.
281
282 (define (make-address actor-id hive-id channel dead?)
283   (vector actor-id hive-id channel dead?))
284
285 (define (address-actor-id address)
286   (vector-ref address 0))
287
288 (define (address-hive-id address)
289   (vector-ref address 1))
290
291 (define (address-channel address)
292   (vector-ref address 2))
293
294 (define (address-dead? address)
295   (vector-ref address 3))
296
297 (define (address->string address)
298   (string-append (address-actor-id address) "@"
299                  (address-hive-id address)))
300
301 (define (address-equal? address1 address2)
302   "Check whether or not the two addresses are equal.
303
304 This compares the actor-id and hive-id but ignores the channel and
305 dead? condition."
306   (match address1
307     (#(actor-id-1 hive-id-1 _ _)
308      (match address2
309        (#(actor-id-2 hive-id-2)
310         (and (equal? actor-id-1 actor-id-2)
311              (and (equal? hive-id-1 hive-id-2))))
312        (_ #f)))
313     (_ #f)))
314
315 (define (actor-id-actor actor)
316   "Get the actor id component of the actor-id"
317   (address-actor-id (actor-id actor)))
318
319 (define (actor-id-hive actor)
320   "Get the hive id component of the actor-id"
321   (address-hive-id (actor-id actor)))
322
323 (define (actor-id-string actor)
324   "Render the full actor id as a human-readable string"
325   (address->string (actor-id actor)))
326
327 (define (actor-inbox-enq actor)
328   (address-channel (actor-id actor)))
329
330 (define *current-actor*
331   (make-parameter #f))
332
333 (define *actor-prompt*
334   (make-parameter #f))
335
336 (define (actor-main-loop actor)
337   "Main loop of the actor.  Loops around, pulling messages off its queue
338 and handling them."
339   ;; @@: Maybe establish some sort of garbage collection routine for these...
340   (define waiting
341     (make-hash-table))
342   (define message-handler
343     (actor-message-handler actor))
344   (define dead?
345     (address-dead? (actor-id actor)))
346   (define prompt (make-prompt-tag (actor-id-actor actor)))
347
348   (define (handle-message message)
349     (catch #t
350       (lambda ()
351         (call-with-values
352             (lambda ()
353               (message-handler actor message))
354           (lambda vals
355             ;; Return reply if necessary
356             (when (message-wants-reply message)
357               (when (message-wants-reply message)
358                 (%<- #f actor (message-from message) '*reply*
359                      vals ((actor-msg-id-generator actor))
360                      (message-id message)))))))
361       (const #t)
362       (let ((err (current-error-port)))
363         (lambda (key . args)
364           (false-if-exception
365            (let ((stack (make-stack #t 4)))
366              (format err "Uncaught exception when handling message ~a:\n"
367                      message)
368              (display-backtrace stack err)
369              (print-exception err (stack-ref stack 0)
370                               key args)
371              (newline err)
372              ;; If the other actor is waiting on a reply, let's let them
373              ;; know there was an error...
374              (when (message-wants-reply message)
375                (%<- #f actor (message-from message) '*error*
376                     (list key) ((actor-msg-id-generator actor))
377                     (message-id message)))))))))
378   
379   (define (resume-handler message)
380     (define in-reply-to (message-in-reply-to message))
381     (cond
382      ((hash-ref waiting in-reply-to) =>
383       (lambda (kont)
384         (hash-remove! waiting in-reply-to)
385         (kont message)))
386      (else
387       (format (current-error-port)
388               "Tried to resume nonexistant message: ~a\n"
389               (message-id message)))))
390
391   (define halt-or-handle-message
392     ;; It would be nice if we could give priorities to certain operations.
393     ;; halt should always win over getting a message...
394     (choice-operation
395      (wrap-operation (wait-operation dead?)
396                      (const #f))  ; halt and return
397      (wrap-operation (get-operation (actor-inbox-deq actor))
398                      (lambda (message)
399                        (call-with-prompt prompt
400                          (lambda ()
401                            (if (message-in-reply-to message)
402                                ;; resume a continuation which was waiting on a reply
403                                (resume-handler message)
404                                ;; start handling a new message
405                                (handle-message message)))
406                          ;; Here's where we abort to if we're doing <-wait
407                          ;; @@: maybe use match-lambda if we're going to end up
408                          ;;   handling multiple ~commands
409                          (lambda (kont to action message-args)
410                            (define message-id
411                              ((actor-msg-id-generator actor)))
412                            (hash-set! waiting message-id kont)
413                            (%<- #t actor to action message-args message-id #f)))
414                        #t))))   ; loop again
415
416   ;; Mutate the parameter; this should be fine since each fiber
417   ;; runs in its own dynamic state with with-dynamic-state.
418   ;; See with-dynamic-state discussion in
419   ;;   https://wingolog.org/archives/2017/06/27/growing-fibers
420   (*current-actor* actor)
421   ;; We temporarily set the *actor-prompt* to #f to make sure that
422   ;; actor-init! doesn't try to do a <-wait message (and not accidentally use
423   ;; the parent fiber's *actor-prompt* either.)
424   (*actor-prompt* #f)
425   (actor-init! actor)
426   (*actor-prompt* prompt)
427
428   (let loop ()
429     (and (perform-operation halt-or-handle-message)
430          (loop))))
431
432 \f
433 ;;; Actor utilities
434 ;;; ===============
435
436 (define-syntax-rule (define-actor class inherits
437                       (action ...)
438                       slots ...)
439   (define-class class inherits
440     (actions #:init-thunk (build-actions action ...)
441              #:allocation #:each-subclass)
442     slots ...))
443
444 \f
445 ;;; The Hive
446 ;;; ========
447 ;;;   Every actor has a hive, which keeps track of other actors, manages
448 ;;;   cleanup, and performs inter-hive communication.
449
450 (define-class <hive> ()
451   (id #:init-keyword #:id
452       #:getter hive-id)
453   (actor-registry #:init-thunk make-hash-table
454                   #:getter hive-actor-registry)
455   ;; TODO: Rename "ambassadors" to "relays"
456   ;; Ambassadors are used (or will be) for inter-hive communication.
457   ;; These are special actors that know how to route messages to other
458   ;; hives.
459   (ambassadors #:init-thunk make-weak-key-hash-table
460                #:getter hive-ambassadors)
461   (channel #:init-thunk make-channel
462            #:getter hive-channel)
463   (halt? #:init-thunk make-condition
464          #:getter hive-halt?))
465
466 (define* (make-hive #:key hive-id)
467   (make <hive> #:id (or hive-id
468                         (big-random-number-string))))
469
470 (define (gen-actor-id cookie)
471   (if cookie
472       (string-append cookie ":" (big-random-number-string))
473       (big-random-number-string)))
474
475 (define (hive-main-loop hive)
476   "The main loop of the hive.  This listens for messages on the hive-channel
477 for certain actions to perform.
478
479 `messages' here is not the same as a <message> object; these are a list of
480 values, the first value being a symbol"
481   (define channel (hive-channel hive))
482   (define halt? (hive-halt? hive))
483   (define registry (hive-actor-registry hive))
484
485   ;; not the same as a <message> ;P
486   (define handle-message
487     (match-lambda
488       (('register-actor actor-id address actor)
489        (hash-set! registry actor-id (vector address actor)))
490       ;; Remove the actor from hive
491       (('remove-actor actor-id)
492        (hash-remove! (hive-actor-registry hive) actor-id))
493       (('register-ambassador hive-id ambassador-actor-id)
494        'TODO)
495       (('unregister-ambassador hive-id ambassador-actor-id)
496        'TODO)
497       (('forward-message from-actor-id message)
498        'TODO)))
499
500   (define halt-or-handle
501     (choice-operation
502      (wrap-operation (get-operation channel)
503                      (lambda (msg)
504                        (handle-message msg)
505                        #t))
506      (wrap-operation (wait-operation halt?)
507                      (const #f))))
508
509   (let lp ()
510     (and (perform-operation halt-or-handle)
511          (lp))))
512
513 (define *current-hive* (make-parameter #f))
514
515 (define* (spawn-hive proc #:key (hive (make-hive)))
516   "Spawn a hive in a fiber running PROC, passing it the fresh hive"
517   (spawn-fiber (lambda () (hive-main-loop hive)))
518   (proc hive))
519
520 (define (run-hive proc . args)
521   "Spawn a hive and run it in run-fibers.  Takes a PROC as would be passed
522 to spawn-hive... all remaining arguments passed to run-fibers."
523   (apply run-fibers
524          (lambda ()
525            (spawn-hive proc))
526          args))
527
528 (define (%create-actor hive-channel hive-id
529                        actor-class init-args id-cookie send-init?)
530   (let* ((actor-id (gen-actor-id id-cookie))
531          (dead? (make-condition))
532          (inbox-enq (make-channel))
533          (address (make-address actor-id hive-id
534                                 inbox-enq dead?))
535          (actor (apply make actor-class
536                        #:hive-channel hive-channel
537                        #:address address
538                        init-args))
539          (should-init (actor-should-init actor)))
540
541     ;; start the main loop
542     (spawn-fiber (lambda ()
543                    ;; start the inbox loop
544                    (spawn-fiber
545                     (lambda ()
546                       (delivery-agent inbox-enq (actor-inbox-deq actor)
547                                       dead?))
548                     ;; this one is decidedly non-parallel, because we want
549                     ;; the delivery agent to be in the same thread as its actor
550                     #:parallel? #f)
551
552                    (actor-main-loop actor))
553                  #:parallel? #t)
554
555     (put-message hive-channel (list 'register-actor actor-id address actor))
556     
557     ;; return the address
558     address))
559
560 (define* (bootstrap-actor hive actor-class #:rest init-args)
561   "Create an actor on HIVE using ACTOR-CLASS passing in INIT-ARGS args"
562   (%create-actor (hive-channel hive) (hive-id hive) actor-class
563                  init-args (symbol->string (class-name actor-class))
564                  #f))
565
566 (define* (bootstrap-actor* hive actor-class id-cookie #:rest init-args)
567   "Create an actor, but also allow customizing a 'cookie' added to the id
568 for debugging"
569   (%create-actor (hive-channel hive) (hive-id hive) actor-class
570                  init-args id-cookie
571                  #f))
572
573 (define* (create-actor from-actor actor-class #:rest init-args)
574   "Create an instance of actor-class.  Return the new actor's id.
575
576 This is the method actors should call directly (unless they want
577 to supply an id-cookie, in which case they should use
578 create-actor*)."
579   (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor)
580                  actor-class init-args #f #t))
581
582
583 (define* (create-actor* from-actor actor-class id-cookie #:rest init-args)
584   "Create an instance of actor-class.  Return the new actor's id.
585
586 Like create-actor, but permits supplying an id-cookie."
587   (%create-actor (actor-hive-channel from-actor) (actor-id-hive from-actor)
588                  actor-class init-args id-cookie #t))
589
590 (define* (self-destruct actor #:key (cleanup #t))
591   "Remove an actor from the hive.
592
593 Unless #:cleanup is set to #f, this will first have the actor handle
594 its '*cleanup* action handler."
595   (signal-condition! (address-dead? (actor-id actor)))
596   (put-message (actor-hive-channel actor) (list 'remove-actor (actor-id-actor actor)))
597   ;; Set *actor-prompt* to nothing to prevent actor-cleanup! from sending
598   ;; a message with <-wait
599   (*actor-prompt* #f)
600   (actor-cleanup! actor))
601
602 ;; From a patch I sent to Fibers...
603 (define (condition-signalled? cvar)
604   "Return @code{#t} if @var{cvar} has already been signalled.
605
606 In general you will want to use @code{wait} or @code{wait-operation} to
607 wait on a condition.  However, sometimes it is useful to see whether or
608 not a condition has already been signalled without blocking."
609   (atomic-box-ref ((@@ (fibers conditions) condition-signalled?) cvar)))
610
611 (define (actor-alive? actor)
612   (condition-signalled? (address-dead? (actor-id actor))))