-;; Copyright (C) 2015 Christopher Allan Webber <cwebber@dustycloud.org>
-
-;; This library is free software; you can redistribute it and/or
-;; modify it under the terms of the GNU Lesser General Public
-;; License as published by the Free Software Foundation; either
-;; version 3 of the License, or (at your option) any later version.
-;;
-;; This library is distributed in the hope that it will be useful,
-;; but WITHOUT ANY WARRANTY; without even the implied warranty of
-;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-;; Lesser General Public License for more details.
-;;
-;; You should have received a copy of the GNU Lesser General Public
-;; License along with this library; if not, write to the Free Software
-;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-;; 02110-1301 USA
-
-(define-module (eightsync agenda)
+;;; 8sync --- Asynchronous programming for Guile
+;;; Copyright (C) 2015, 2016 Christopher Allan Webber <cwebber@dustycloud.org>
+;;;
+;;; This file is part of 8sync.
+;;;
+;;; 8sync is free software: you can redistribute it and/or modify it
+;;; under the terms of the GNU Lesser General Public License as
+;;; published by the Free Software Foundation, either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; 8sync is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU Lesser General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Lesser General Public
+;;; License along with 8sync. If not, see <http://www.gnu.org/licenses/>.
+
+(define-module (8sync agenda)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 receive)
+ #:use-module (ice-9 suspendable-ports)
#:export (<agenda>
make-agenda agenda?
agenda-queue agenda-prompt-tag
- agenda-read-port-map agenda-write-port-map agenda-except-port-map
+ agenda-read-port-map agenda-write-port-map
agenda-schedule
make-async-prompt-tag
+ list->q make-q*
+
<time-segment>
make-time-segment time-segment?
time-segment-time time-segment-queue
schedule-segments-split schedule-extract-until!
add-segments-contents-to-queue!
- %sync 8sync
-
<run-request>
make-run-request run-request?
run-request-proc run-request-when
- <port-request>
- make-port-request port-request port-request?
- port-request-port
- port-request-read port-request-write port-request-except
-
- run-it wrap run run-at run-delay
+ run-it wrap wrap-apply run run-at run-delay
- %port-request %run %run-at %run-delay
- 8port-request 8run 8run-at 8run-delay
+ 8sync-delay
+ 8sync-run 8sync-run-at 8sync-run-delay
+ 8sync
+ 8sleep
+ ;; used for introspecting the error, but a method for making
+ ;; is not exposed
+ wrapped-exception?
+ wrapped-exception-key wrapped-exception-args
+ wrapped-exception-stacks
+
+ print-error-and-continue
+
+ stop-on-nothing-to-do
+
%current-agenda
start-agenda agenda-run-once))
(define-immutable-record-type <agenda>
(make-agenda-intern queue prompt-tag
- read-port-map write-port-map except-port-map
- schedule time)
+ read-port-map write-port-map
+ schedule time catch-handler pre-unwind-handler)
agenda?
(queue agenda-queue)
(prompt-tag agenda-prompt-tag)
(read-port-map agenda-read-port-map)
(write-port-map agenda-write-port-map)
- (except-port-map agenda-except-port-map)
(schedule agenda-schedule)
- (time agenda-time))
+ (time agenda-time)
+ (catch-handler agenda-catch-handler)
+ (pre-unwind-handler agenda-pre-unwind-handler))
(define (make-async-prompt-tag)
"Make an async prompt tag for an agenda.
(prompt (make-prompt-tag))
(read-port-map (make-hash-table))
(write-port-map (make-hash-table))
- (except-port-map (make-hash-table))
(schedule (make-schedule))
- (time (gettimeofday)))
+ (time (gettimeofday))
+ (catch-handler #f)
+ (pre-unwind-handler print-error-and-continue))
;; TODO: document arguments
"Make a fresh agenda."
(make-agenda-intern queue prompt
- read-port-map write-port-map except-port-map
- schedule time))
+ read-port-map write-port-map
+ schedule time
+ catch-handler pre-unwind-handler))
(define (current-agenda-prompt)
"Get the prompt for the current agenda; signal an error if there isn't one."
"Can't get current agenda prompt if there's no current agenda!")
(agenda-prompt-tag current-agenda))))
+;; helper for making queues for an agenda
+(define (list->q lst)
+ "Makes a queue composed of LST items"
+ (let ((q (make-q)))
+ (for-each
+ (lambda (x)
+ (enq! q x))
+ lst)
+ q))
+
+(define (make-q* . args)
+ "Makes a queue and populates it with this invocation's ARGS"
+ (list->q args))
\f
;;; Schedule
(time time-segment-time)
(queue time-segment-queue))
+;; @@: This seems to be the same as srfi-18's seconds->time procedure?
+;; Maybe double check and switch to that? (Thanks amz3!)
+
+(define (time-from-float-or-fraction time)
+ "Produce a (sec . usec) pair from TIME, a float or fraction"
+ (let* ((mixed-whole (floor time))
+ (mixed-rest (- time mixed-whole)) ; float or fraction component
+ (sec mixed-whole)
+ (usec (floor (* 1000000 mixed-rest))))
+ (cons (inexact->exact sec) (inexact->exact usec))))
+
(define (time-segment-right-format time)
"Ensure TIME is in the right format.
(((? integer? s) . (? integer? u)) time)
;; time was just an integer (just the second)
((? integer? _) (cons time 0))
+ ((or (? rational? _) (? inexact? _))
+ (time-from-float-or-fraction time))
(_ (throw 'invalid-time "Invalid time" time))))
(define* (make-time-segment time #:optional (queue (make-q)))
(sec time-delta-sec)
(usec time-delta-usec))
-(define* (make-time-delta sec #:optional (usec 0))
+(define* (make-time-delta time)
"Make a <time-delta> of SEC seconds and USEC microseconds.
This is used primarily so the agenda can recognize RUN-REQUEST objects
-which are meant "
- (make-time-delta-intern sec usec))
+which are meant to delay computation"
+ (match (time-segment-right-format time)
+ ((sec . usec)
+ (make-time-delta-intern sec usec))))
(define tdelta make-time-delta)
"Subtract TIME2 from TIME1"
(time-carry-correct
(cons (- (car time1) (car time2))
- (- (cdr time2) (cdr time2)))))
+ (- (cdr time1) (cdr time2)))))
(define (time-plus time1 time2)
"Add TIME1 and TIME2"
(time-carry-correct
(cons (+ (car time1) (car time2))
- (+ (cdr time2) (cdr time2)))))
+ (+ (cdr time1) (cdr time2)))))
(define-record-type <schedule>
(lambda ()
body ...))
+(define-syntax-rule (wrap-apply body)
+ "Wrap possibly multi-value function in a procedure, applies all arguments"
+ (lambda args
+ (apply body args)))
+
+
;; @@: Do we really want `body ...' here?
;; what about just `body'?
(define-syntax-rule (run body ...)
(make-run-request (wrap body ...) (tdelta delay-time)))
-;; A request to set up a port with at least one of read, write, except
-;; handling processes
-
-(define-record-type <port-request>
- (make-port-request-intern port read write except)
- port-request?
- (port port-request-port)
- (read port-request-read)
- (write port-request-write)
- (except port-request-except))
-
-(define* (make-port-request port #:key read write except)
- (if (not (or read write except))
- (throw 'no-port-handler-given "No port handler given.\n"))
- (make-port-request-intern port read write except))
-
-(define port-request make-port-request)
-
-
\f
;;; Asynchronous escape to run things
;;; =================================
-;; The future's in futures
-
-(define (make-future call-first on-success on-fail on-error)
- ;; TODO: add error stuff here
- (lambda ()
- (let ((call-result (call-first)))
- ;; queue up calling the
- (run (on-success call-result)))))
-
-(define (agenda-on-error agenda)
- (const #f))
-
-(define (agenda-on-fail agenda)
- (const #f))
-
-(define* (request-future call-first on-success
- #:key
- (agenda (%current-agenda))
- (on-fail (agenda-on-fail agenda))
- (on-error (agenda-on-error agenda))
- (when #f))
- ;; TODO: error handling
- ;; do we need some distinction between expected, catchable errors,
- ;; and unexpected, uncatchable ones? Probably...?
- (make-run-request
- (make-future call-first on-success on-fail on-error)
- when))
-
-(define-syntax-rule (%sync async-request)
- "Run BODY asynchronously at a prompt, passing args to make-future.
-
-Pronounced `eight-sync' despite the spelling.
-
-%sync was chosen because (async) was already taken and could lead to
-errors, and this version of asynchronous code uses a prompt, so the `a'
-character becomes a `%' prompt! :)
-
-The % and 8 characters kind of look similar... hence this library's
-name! (That, and the pun 'eight-synchronous' programming.)
-There are 8sync aliases if you prefer that name."
+(define-syntax-rule (8sync-abort-to-prompt async-request)
(abort-to-prompt (current-agenda-prompt)
async-request))
-(define-syntax-rule (8sync args ...)
- "Alias for %sync"
- (%sync args ...))
-
;; Async port request and run-request meta-requests
(define (make-async-request proc)
"Wrap PROC in an async-request
;; TODO: deliver more helpful errors depending on what the user
;; returned
(_ (throw 'invalid-async-request
- "Invalid request passed back via an (%sync) procedure."
+ "Invalid request passed back via an (8sync) procedure."
async-request))))
-(define-syntax-rule (%run body ...)
- (%run-at body ... #f))
-
-(define-syntax-rule (%run-at body ... when)
- (make-async-request
- (lambda (kont)
- (make-run-request
- (wrap
- (kont
- (begin body ...)))
- when))))
-
-(define-syntax-rule (%run-delay body ... delay-time)
- (%run-at body ... (tdelta delay-time)))
-
-(define-syntax-rule (%port-request add-this-port port-request-args ...)
- (make-async-request
- (lambda (kont)
- (list (make-port-request port-request-args ...)
- (make-run-request kont)))))
-
-;; TODO
-(define-syntax-rule (%run-with-return return body ...)
- (make-async-request
- (lambda (kont)
- (let ((return kont))
- (lambda ()
- body ...)))))
-
-;; Aliases
-(define-syntax-rule (8run args ...) (%run args ...))
-(define-syntax-rule (8run-at args ...) (%run-at args ...))
-(define-syntax-rule (8run-delay args ...) (%run-delay args ...))
-(define-syntax-rule (8port-request args ...) (%port-request args ...))
-
+(define-syntax-rule (8sync body ...)
+ "Run body asynchronously but ignore its result...
+forge ahead in our current function!"
+ (8sync-abort-to-prompt
+ (make-async-request
+ (lambda (kont)
+ (list (make-run-request
+ ;; What's with returning #f to kont?
+ ;; Otherwise we sometimes get errors like
+ ;; "Zero values returned to single-valued continuation""
+ (wrap (kont #f)) #f)
+ (make-run-request (lambda () body ...) #f))))))
+
+;; TODO: Rewrite when we move to this being just `sleep'.
+(define (8sleep secs)
+ "Like sleep, but asynchronous."
+ (8sync-abort-to-prompt
+ (make-async-request
+ (lambda (kont)
+ (make-run-request (lambda () (kont #f)) (tdelta secs))))))
+
+(define (8usleep usecs)
+ "Like usleep, but asynchronous."
+ (define (usecs->time-pair)
+ (if (< 1000000)
+ (cons 0 usecs)
+ (let* ((sec (floor (/ usecs 1000000)))
+ (msec (- usecs (* sec 1000000))))
+ (cons sec msec))))
+ (8sync-abort-to-prompt
+ (make-async-request
+ (lambda (kont)
+ (make-run-request (lambda () (kont #f)) (tdelta usecs->time-pair))))))
+
+;; Voluntarily yield execution
+(define (yield) ; @@: should this be define-inlinable?
+ "Voluntarily yield execution to the scheduler."
+ (8sync-abort-to-prompt
+ (make-async-request
+ (lambda (kont)
+ (make-run-request (lambda () (kont #f)) #f)))))
\f
;;; Execution of agenda, and current agenda
(define %current-agenda (make-parameter #f))
(define (update-agenda-from-select! agenda)
- "Potentially (select) on ports specified in agenda, adding items to queue"
+ "Potentially (select) on ports specified in agenda, adding items to queue.
+
+Also handles sleeping when all we have to do is wait on the schedule."
(define (hash-keys hash)
(hash-map->list (lambda (k v) k) hash))
(define (get-wait-time)
;; TODO: support usecond wait time too
(match (get-wait-time)
((sec . usec)
- (select (hash-keys (agenda-read-port-map agenda))
- (hash-keys (agenda-write-port-map agenda))
- (hash-keys (agenda-except-port-map agenda))
- sec usec))))
+ (catch 'system-error
+ (lambda ()
+ (select (hash-keys (agenda-read-port-map agenda))
+ (hash-keys (agenda-write-port-map agenda))
+ '()
+ sec usec))
+ (lambda (key . rest-args)
+ (match rest-args
+ ((_ _ _ (EINTR))
+ '(() () ()))
+ (_ (error "Unhandled error in select!" key rest-args))))))))
(define (get-procs-to-run)
(define (ports->procs ports port-map)
(lambda (initial-procs)
(fold
(lambda (port prev)
- (cons (lambda ()
- ((hash-ref port-map port) port))
- prev))
+ (define proc (hashq-ref port-map port))
+ ;; Now that we've selected on this port, it can be removed
+ (hashq-remove! port-map port)
+ (cons proc prev))
initial-procs
ports)))
(match (do-select)
- ((read-ports write-ports except-ports)
- ;; @@: Come on, we can do better than append ;P
+ ((read-ports write-ports except-ports) ; except-ports not used
((compose (ports->procs
read-ports
(agenda-read-port-map agenda))
(ports->procs
write-ports
- (agenda-write-port-map agenda))
- (ports->procs
- except-ports
- (agenda-except-port-map agenda)))
+ (agenda-write-port-map agenda)))
'()))))
(define (update-agenda)
(let ((procs-to-run (get-procs-to-run))
(selector agenda))
0)))
(or (has-items? agenda-read-port-map)
- (has-items? agenda-write-port-map)
- (has-items? agenda-except-port-map)))
+ (has-items? agenda-write-port-map)))
- (if (ports-to-select?)
+ (if (or (ports-to-select?)
+ ;; select doubles as sleep...
+ (not (schedule-empty? (agenda-schedule agenda))))
(update-agenda)
agenda))
-(define (agenda-handle-port-request! agenda port-request)
- "Update an agenda for a port-request"
- (define (handle-selector request-selector port-map-selector)
- (if (request-selector port-request)
- (hash-set! (port-map-selector agenda)
- (port-request-port port-request)
- (request-selector port-request))))
- (handle-selector port-request-read agenda-read-port-map)
- (handle-selector port-request-write agenda-write-port-map)
- (handle-selector port-request-except agenda-except-port-map))
+(define-record-type <read-request>
+ (make-read-request port proc)
+ read-request?
+ (port read-request-port)
+ (proc read-request-proc))
+
+(define-record-type <write-request>
+ (make-write-request port proc)
+ write-request?
+ (port write-request-port)
+ (proc write-request-proc))
+
+(define (agenda-handle-read-request! agenda read-request)
+ "Handle <read-request>, which is a request to add this port to the poll/select
+on suspendable ports."
+ (hashq-set! (agenda-read-port-map agenda)
+ (read-request-port read-request)
+ (read-request-proc read-request)))
+
+(define (agenda-handle-write-request! agenda write-request)
+ (hashq-set! (agenda-write-port-map agenda)
+ (write-request-port write-request)
+ (write-request-proc write-request)))
+
+(define (stop-on-nothing-to-do agenda)
+ (and (q-empty? (agenda-queue agenda))
+ (schedule-empty? (agenda-schedule agenda))
+ (= 0 (hash-count (const #t) (agenda-read-port-map agenda)))
+ (= 0 (hash-count (const #t) (agenda-write-port-map agenda)))))
(define* (start-agenda agenda
- #:key stop-condition
+ #:key
+ ;; @@: Should we make stop-on-nothing-to-do
+ ;; the default stop-condition?
+ (stop-condition stop-on-nothing-to-do)
(get-time gettimeofday)
- (handle-ports update-agenda-from-select!))
+ (handle-ports update-agenda-from-select!)
+ ;; For live hacking madness, etc
+ (post-run-hook #f))
;; TODO: Document fields
"Start up the AGENDA"
+ (install-suspendable-ports!)
(let loop ((agenda agenda))
(let ((agenda
;; @@: Hm, maybe here would be a great place to handle
;; select'ing on ports.
;; We could compose over agenda-run-once and agenda-read-ports
(agenda-run-once agenda)))
+ ;; @@: This relies on mutation at present on the queue, in the rare
+ ;; event it's used. If we ever switch to something more immutable,
+ ;; it should return a new modified agenda instead.
+ (if post-run-hook
+ (post-run-hook agenda))
(if (and stop-condition (stop-condition agenda))
'done
(let* ((agenda
(agenda-queue agenda))
(loop agenda))))))
+
+(define (print-error-and-continue key . args)
+ "Frequently used as pre-unwind-handler for agenda"
+ (cond
+ ((eq? key '8sync-caught-error)
+ (match args
+ ((orig-key orig-args stacks)
+ (display "\n*** Caught async exception. ***\n")
+ (format (current-error-port)
+ "* Original key '~s and arguments: ~s *\n"
+ orig-key orig-args)
+ (display "* Caught stacks below (ending with original) *\n\n")
+ (for-each
+ (lambda (s)
+ (display-backtrace s (current-error-port))
+ (newline (current-error-port)))
+ stacks))))
+ (else
+ (format (current-error-port)
+ "\n*** Caught exception with key '~s and arguments: ~s ***\n"
+ key args)
+ (display-backtrace (make-stack #t 1 0)
+ (current-error-port))
+ (newline (current-error-port)))))
+
+(define-syntax-rule (maybe-catch-all (catch-handler pre-unwind-handler)
+ body ...)
+ (if (or catch-handler pre-unwind-handler)
+ (catch
+ #t
+ (lambda ()
+ body ...)
+ (or catch-handler (lambda _ #f))
+ (or pre-unwind-handler (lambda _ #f)))
+ (begin body ...)))
+
+(define (wait-for-readable port)
+ (8sync-abort-to-prompt
+ (make-async-request
+ (lambda (kont)
+ (make-read-request port (wrap (kont #f)))))))
+
+(define (wait-for-writable port)
+ (8sync-abort-to-prompt
+ (make-async-request
+ (lambda (kont)
+ (make-write-request port (wrap (kont #f)))))))
+
(define (agenda-run-once agenda)
"Run once through the agenda, and produce a new agenda
based on the results"
(call-with-prompt
(agenda-prompt-tag agenda)
(lambda ()
- (parameterize ((%current-agenda agenda))
- (proc)))
+ (parameterize ((%current-agenda agenda)
+ ;; @@: Couldn't we just parameterize this at the start of
+ ;; the agenda...?
+ (current-read-waiter wait-for-readable)
+ (current-write-waiter wait-for-writable))
+ (maybe-catch-all
+ ((agenda-catch-handler agenda)
+ (agenda-pre-unwind-handler agenda))
+ (proc))))
(lambda (kont async-request)
(setup-async-request kont async-request))))
(#f
(enq! next-queue (run-request-proc run-request))))))))
(define (handle-individual result)
+ ;; @@: Could maybe optimize by switching to an explicit cond...
(match result
((? run-request? new-proc)
(enqueue new-proc))
- ((? port-request? port-request)
- (agenda-handle-port-request! agenda port-request))
+ ((? read-request? read-request)
+ (agenda-handle-read-request! agenda read-request))
+ ((? write-request? write-request)
+ (agenda-handle-write-request! agenda write-request))
;; do nothing
+ ;; Remember, we don't throw an error here because procedures can
+ ;; return a run request, eg with run-it, at the end of their
+ ;; evaluation to keep looping.
+ ;; @@: Though is this really a useful feature?
(_ #f)))
;; @@: We might support delay-wrapped procedures here
(match proc-result