From: Christopher Allan Webber Date: Thu, 3 Aug 2017 15:29:39 +0000 (-0500) Subject: Update web, websocket servers for recent actor changes. X-Git-Url: https://jxself.org/git/?p=8sync.git;a=commitdiff_plain;h=0de8ac7a40e29cbc84924936faa13acf7c5f29db Update web, websocket servers for recent actor changes. * 8sync/systems/web.scm (): Remove *init* and *cleanup* action handlers. (actor-init!, actor-cleanup!): Add method definitions for . (web-server-socket-loop, web-server-client-loop): Use with-actor-nonblocking-ports. (web-server-client-loop): Removed. * 8sync/systems/websocket/server.scm (websocket-client-loop) (websocket-server-send): Wrap I/O behavior in with-actor-nonblocking-ports. --- diff --git a/8sync/systems/web.scm b/8sync/systems/web.scm index a6ee696..5d3ad3a 100644 --- a/8sync/systems/web.scm +++ b/8sync/systems/web.scm @@ -43,8 +43,7 @@ .upgrade-paths .http-handler)) (define-actor () - ((*init* web-server-socket-loop) - (*cleanup* web-server-cleanup) + ((main-loop web-server-socket-loop) (shutdown web-server-shutdown) (new-client web-server-client-loop) (handle-request web-server-handle-request)) @@ -112,21 +111,31 @@ (listen sock 1024) sock)) +(define-method (actor-init! (web-server )) + (<- (actor-id web-server) 'main-loop)) + +(define-method (actor-cleanup! (web-server )) + ;; @@: Should we close any pending requests too? + (close (.socket web-server))) + (define (web-server-socket-loop web-server message) "The main loop on our socket. Keep accepting new clients as long as we're alive." - (while #t - (match (accept (.socket web-server)) - ((client . sockaddr) - ;; From "HOP, A Fast Server for the Diffuse Web", Serrano. - (setsockopt client SOL_SOCKET SO_SNDBUF (* 12 1024)) - (set-nonblocking! client) - ;; Always disable Nagle's algorithm, as we handle buffering - ;; ourselves. Ignore exceptions if it's not a TCP port, or - ;; TCP_NODELAY is not defined on this platform. - (false-if-exception - (setsockopt client IPPROTO_TCP TCP_NODELAY 0)) - (<- (actor-id web-server) 'new-client client))))) + (with-actor-nonblocking-ports + (lambda () + (while #t + (match (accept (.socket web-server)) + ((client . sockaddr) + (pk 'a-client client sockaddr) + ;; From "HOP, A Fast Server for the Diffuse Web", Serrano. + (setsockopt client SOL_SOCKET SO_SNDBUF (* 12 1024)) + (set-nonblocking! client) + ;; Always disable Nagle's algorithm, as we handle buffering + ;; ourselves. Ignore exceptions if it's not a TCP port, or + ;; TCP_NODELAY is not defined on this platform. + (false-if-exception + (setsockopt client IPPROTO_TCP TCP_NODELAY 0)) + (<- (actor-id web-server) 'new-client client))))))) (define (keep-alive? response) (let ((v (response-version response))) @@ -158,56 +167,58 @@ as we're alive." (define (web-server-client-loop web-server message client) "Read request(s) from a client and pass off to the handler." - (with-throw-handler #t - (lambda () - (let loop () - (define (respond-and-maybe-continue _ response body) - (write-response response client) - (when body - (put-bytevector client body)) - (force-output client) - (if (and (keep-alive? response) - (not (eof-object? (peek-char client)))) - (loop) - (close-port client))) - (cond - ((eof-object? (lookahead-u8 client)) - (close-port client)) - (else - (catch #t - (lambda () - (let* ((request (read-request client)) - (body (read-request-body request))) - (cond - ;; Should we "upgrade" the protocol? - ;; Doing so "breaks out" of this loop, possibly into a new one - ((maybe-upgrade-request web-server request body) => - (lambda (upgrade) - ;; TODO: this isn't great because we're in this catch, - ;; which doesn't make sense once we've "upgraded" - ;; since we might not "respond" in the same way anymore. - (upgrade web-server client request body))) - (else - (call-with-message - ;; TODO: Add error handling in case we get an error - ;; response - (<-wait (actor-id web-server) 'handle-request - request body) - respond-and-maybe-continue))))) - (lambda (key . args) - (display "While reading request:\n" (current-error-port)) - (print-exception (current-error-port) #f key args) - (respond-and-maybe-continue - #f ;; ignored, there is no message - (build-response #:version '(1 . 0) #:code 400 - #:headers '((content-length . 0))) - #vu8()))))))) - (lambda (k . args) - (catch #t - (lambda () (close-port client)) - (lambda (k . args) - (display "While closing port:\n" (current-error-port)) - (print-exception (current-error-port) #f k args)))))) + (with-actor-nonblocking-ports + (lambda () + (with-throw-handler #t + (lambda () + (let loop () + (define (respond-and-maybe-continue response body) + (write-response response client) + (when body + (put-bytevector client body)) + (force-output client) + (if (and (keep-alive? response) + (not (eof-object? (peek-char client)))) + (loop) + (close-port client))) + (cond + ((eof-object? (lookahead-u8 client)) + (close-port client)) + (else + (catch #t + (lambda () + (let* ((request (read-request client)) + (body (read-request-body request))) + (cond + ;; Should we "upgrade" the protocol? + ;; Doing so "breaks out" of this loop, possibly into a new one + ((maybe-upgrade-request web-server request body) => + (lambda (upgrade) + ;; TODO: this isn't great because we're in this catch, + ;; which doesn't make sense once we've "upgraded" + ;; since we might not "respond" in the same way anymore. + (upgrade web-server client request body))) + (else + (call-with-values + (lambda () + ;; TODO: Add error handling in case we get an error + ;; response + (<-wait (actor-id web-server) 'handle-request + request body)) + respond-and-maybe-continue))))) + (lambda (key . args) + (display "While reading request:\n" (current-error-port)) + (print-exception (current-error-port) #f key args) + (respond-and-maybe-continue + (build-response #:version '(1 . 0) #:code 400 + #:headers '((content-length . 0))) + #vu8()))))))) + (lambda (k . args) + (catch #t + (lambda () (close-port client)) + (lambda (k . args) + (display "While closing port:\n" (current-error-port)) + (print-exception (current-error-port) #f k args)))))))) (define (web-server-handle-request web-server message request body) @@ -215,11 +226,7 @@ as we're alive." ((.http-handler web-server) request body) (receive (response body) (sanitize-response request response body) - (<-reply message response body)))) - -(define (web-server-cleanup web-server message) - ;; @@: Should we close any pending requests too? - (close (.socket web-server))) + (values response body)))) (define (web-server-shutdown web-server message) (self-destruct web-server)) diff --git a/8sync/systems/websocket/server.scm b/8sync/systems/websocket/server.scm index 6283255..81c9244 100644 --- a/8sync/systems/websocket/server.scm +++ b/8sync/systems/websocket/server.scm @@ -125,58 +125,61 @@ called for each complete message that is received." ((.on-ws-client-connect websocket-server) websocket-server client-id) - ;; Perform the HTTP handshake and upgrade to WebSocket protocol. - (let* ((client-key (assoc-ref (request-headers request) 'sec-websocket-key)) - (response (make-handshake-response client-key))) - (write-response response client) - (let loop ((fragments '()) - (type #f)) - (let ((frame (read-frame-maybe))) - (cond - ;; EOF - port is closed. - ;; @@: Sometimes the eof object appears here as opposed to - ;; at lookahead, but I'm not sure why - ((or (not frame) (eof-object? frame)) - (close-down)) - ;; Per section 5.4, control frames may appear interspersed - ;; along with a fragmented message. - ((close-frame? frame) - ;; Per section 5.5.1, echo the close frame back to the - ;; client before closing the socket. The client may no - ;; longer be listening. - (false-if-exception - (write-frame (make-close-frame (frame-data frame)) client)) - (close-down)) - ((ping-frame? frame) - ;; Per section 5.5.3, a pong frame must include the exact - ;; same data as the ping frame. - (write-frame (make-pong-frame (frame-data frame)) client) - (loop fragments type)) - ((pong-frame? frame) ; silently ignore pongs - (loop fragments type)) - ((first-fragment-frame? frame) ; begin accumulating fragments - (loop (list frame) (frame-type frame))) - ((final-fragment-frame? frame) ; concatenate all fragments - (handle-data-frame type (frame-concatenate (reverse fragments))) - (loop '() #f)) - ((fragment-frame? frame) ; add a fragment - (loop (cons frame fragments) type)) - ((data-frame? frame) ; unfragmented data frame - (handle-data-frame (frame-type frame) (frame-data frame)) - (loop '() #f))))))) + (with-actor-nonblocking-ports + (lambda () + ;; Perform the HTTP handshake and upgrade to WebSocket protocol. + (let* ((client-key (assoc-ref (request-headers request) 'sec-websocket-key)) + (response (make-handshake-response client-key))) + (write-response response client) + (let loop ((fragments '()) + (type #f)) + (let ((frame (read-frame-maybe))) + (cond + ;; EOF - port is closed. + ;; @@: Sometimes the eof object appears here as opposed to + ;; at lookahead, but I'm not sure why + ((or (not frame) (eof-object? frame)) + (close-down)) + ;; Per section 5.4, control frames may appear interspersed + ;; along with a fragmented message. + ((close-frame? frame) + ;; Per section 5.5.1, echo the close frame back to the + ;; client before closing the socket. The client may no + ;; longer be listening. + (false-if-exception + (write-frame (make-close-frame (frame-data frame)) client)) + (close-down)) + ((ping-frame? frame) + ;; Per section 5.5.3, a pong frame must include the exact + ;; same data as the ping frame. + (write-frame (make-pong-frame (frame-data frame)) client) + (loop fragments type)) + ((pong-frame? frame) ; silently ignore pongs + (loop fragments type)) + ((first-fragment-frame? frame) ; begin accumulating fragments + (loop (list frame) (frame-type frame))) + ((final-fragment-frame? frame) ; concatenate all fragments + (handle-data-frame type (frame-concatenate (reverse fragments))) + (loop '() #f)) + ((fragment-frame? frame) ; add a fragment + (loop (cons frame fragments) type)) + ((data-frame? frame) ; unfragmented data frame + (handle-data-frame (frame-type frame) (frame-data frame)) + (loop '() #f))))))))) (define (websocket-server-send websocket-server message client-id data) - (cond ((hash-ref (.ws-clients websocket-server) client-id) => - (lambda (client) - (write-frame - (cond ((string? data) - (make-text-frame data)) - ((bytevector? data) - (make-binary-frame data))) - client) - ;; ok is like success, amirite - (<-reply message 'ok))) - (else - ;; No such client with that id. - ;; Either it closed, or it was never there. - (<-reply message 'client-gone)))) + (with-actor-nonblocking-ports + (lambda () + (cond ((hash-ref (.ws-clients websocket-server) client-id) => + (lambda (client) + (write-frame + (cond ((string? data) + (make-text-frame data)) + ((bytevector? data) + (make-binary-frame data))) + client) + ;; ok is like success, amirite + 'ok)) + ;; No such client with that id. + ;; Either it closed, or it was never there. + (else 'client-gone)))))