X-Git-Url: https://jxself.org/git/?a=blobdiff_plain;f=8sync%2Fsystems%2Fwebsocket%2Fclient.scm;fp=8sync%2Fsystems%2Fwebsocket%2Fclient.scm;h=11702f7564c8dbaf6ba35bccc9a19df8e0badfd5;hb=ab7d0294d28fa686226e714a606c102e1f265a41;hp=86f82ef4bb61e8b8c472129381de0bb3d0c6c502;hpb=5074289a33640bb3bd78a711d7ceb645d7ae0cfd;p=8sync.git diff --git a/8sync/systems/websocket/client.scm b/8sync/systems/websocket/client.scm index 86f82ef..11702f7 100644 --- a/8sync/systems/websocket/client.scm +++ b/8sync/systems/websocket/client.scm @@ -1,5 +1,7 @@ ;;; guile-websocket --- WebSocket client/server ;;; Copyright © 2016 David Thompson +;;; Copyright © 2017 Christopher Allan Webber +;;; Copyright © 2019, 2020 Jan (janneke) Nieuwenhuizen ;;; ;;; This file is part of guile-websocket. ;;; @@ -25,27 +27,161 @@ (define-module (8sync systems websocket client) #:use-module (ice-9 match) + #:use-module (srfi srfi-26) #:use-module (rnrs bytevectors) #:use-module (rnrs io ports) - #:use-module (srfi srfi-9) - #:use-module (srfi srfi-9 gnu) #:use-module (web request) #:use-module (web response) #:use-module (web uri) + #:use-module (oop goops) + #:use-module (8sync) + #:use-module (8sync ports) #:use-module (8sync contrib base64) #:use-module (8sync systems websocket frame) #:use-module (8sync systems websocket utils) - #:export (make-websocket - websocket? - websocket-uri - websocket-state + #:export ( + .on-close + .on-error + .on-message + .on-open + .socket + .state + .url + + websocket-closed? + websocket-closing? + websocket-connect websocket-connecting? websocket-open? - websocket-closing? - websocket-closed? - close-websocket - websocket-send - websocket-receive)) + + websocket-close + websocket-loop + websocket-send)) + +(define no-op (const #f)) + +(define-actor () + ((*init* websocket-init) + (close websocket-close) + (open websocket-open) + (send websocket-send)) + + (state #:accessor .state #:init-value 'closed #:init-keyword #:state) + (socket #:accessor .socket #:init-value #f #:init-keyword #:socket) + (url #:getter .url #:init-value #f #:init-keyword #:url) + (uri #:accessor .uri #:init-value #f #:init-keyword #:uri) + (entropy-port #:accessor .entropy-port #:init-form (open-entropy-port)) + + (on-close #:init-keyword #:on-close + #:init-value no-op + #:accessor .on-close) + (on-error #:init-keyword #:on-error + #:init-value no-op + #:accessor .on-error) + (on-message #:init-keyword #:on-message + #:accessor .on-message) + (on-open #:init-keyword #:on-open + #:init-value no-op + #:accessor .on-open)) + +(define-method (websocket-close (websocket ) message) + (when (websocket-open? websocket) + (false-if-exception (close-port (.socket websocket))) + (set! (.state websocket) 'closed) + (false-if-exception ((.on-close websocket) websocket)) + (set! (.socket websocket) #f))) + +(define-method (websocket-open (websocket ) message uri-or-string) + (if (websocket-closed? websocket) + (let ((uri (match uri-or-string + ((? uri? uri) uri) + ((? string? str) (string->uri str))))) + (if (websocket-uri? uri) + (catch 'system-error + (lambda _ + (set! (.uri websocket) uri) + (let ((sock (make-client-socket uri))) + (set! (.socket websocket) sock) + (handshake websocket) + (websocket-loop websocket message))) + (lambda (key . args) + ((.on-error websocket) websocket (format #f "open failed: ~s: ~s" uri-or-string args)))) + ((.on-error websocket) websocket (format #f "not a websocket uri: ~s" uri-or-string)))) + ((.on-error websocket) websocket (format #f "cannot open websocket in state: ~s" (.state websocket))))) + +(define-method (websocket-send (websocket ) message data) + (catch #t ; expect: wrong-type-arg (open port), system-error + (lambda _ + (write-frame + (cond ((string? data) + (make-text-frame data)) + ((bytevector? data) + (make-binary-frame data))) + (.socket websocket))) + (lambda (key . args) + (let ((message (format #f "~a: ~s" key args))) + ((.on-error websocket) websocket (format #f "send failed: ~s ~a\n" websocket message)) + (websocket-close websocket message))))) + +(define-method (websocket-init (websocket ) message) + (and=> (.url websocket) (cut websocket-open websocket message <>))) + +(define-method (websocket-loop (websocket ) message) + + (define (handle-data-frame type data) + ((.on-message websocket) + websocket + (match type + ('text (utf8->string data)) + ('binary data)))) + + (define (read-frame-maybe) + (and (not (eof-object? (lookahead-u8 (.socket websocket)))) + (read-frame (.socket websocket)))) + + (define (close-down) + (websocket-close websocket message)) + + ((.on-open websocket) websocket) + + (let loop ((fragments '()) + (type #f)) + (let* ((socket (.socket websocket)) + (frame (and (websocket-open? websocket) + (read-frame-maybe)))) + (cond + ;; EOF - port is closed. + ;; @@: Sometimes the eof object appears here as opposed to + ;; at lookahead, but I'm not sure why + ((or (not frame) (eof-object? frame)) + (close-down)) + ;; Per section 5.4, control frames may appear interspersed + ;; along with a fragmented message. + ((close-frame? frame) + ;; Per section 5.5.1, echo the close frame back to the + ;; socket before closing the socket. The socket may no + ;; longer be listening. + (false-if-exception + (write-frame (make-close-frame (frame-data frame)) socket)) + (close-down)) + ((ping-frame? frame) + ;; Per section 5.5.3, a pong frame must include the exact + ;; same data as the ping frame. + (write-frame (make-pong-frame (frame-data frame)) socket) + (loop fragments type)) + ((pong-frame? frame) ; silently ignore pongs + (loop fragments type)) + ((first-fragment-frame? frame) ; begin accumulating fragments + (loop (list frame) (frame-type frame))) + ((final-fragment-frame? frame) ; concatenate all fragments + (handle-data-frame type (frame-concatenate + (reverse (cons frame fragments)))) + (loop '() #f)) + ((fragment-frame? frame) ; add a fragment + (loop (cons frame fragments) type)) + ((data-frame? frame) ; unfragmented data frame + (handle-data-frame (frame-type frame) (frame-data frame)) + (loop '() #f)))))) ;; See Section 3 - WebSocket URIs (define (encrypted-websocket-scheme? uri) @@ -64,6 +200,10 @@ scheme." (unencrypted-websocket-scheme? uri)) (not (uri-fragment uri)))) +(define (set-nonblocking! port) + (fcntl port F_SETFL (logior O_NONBLOCK (fcntl port F_GETFL))) + (setvbuf port 'block 1024)) + (define (make-client-socket uri) "Connect a socket to the remote resource described by URI." (let* ((port (uri-port uri)) @@ -74,48 +214,43 @@ scheme." (if port AI_NUMERICSERV 0)))) - (s (with-fluids ((%default-port-encoding #f)) - (socket (addrinfo:fam info) SOCK_STREAM IPPROTO_IP)))) - ;; TODO: Configure I/O buffering? - (connect s (addrinfo:addr info)) - s)) + (sock (with-fluids ((%default-port-encoding #f)) + (socket (addrinfo:fam info) SOCK_STREAM IPPROTO_IP)))) -(define-record-type - (%make-websocket uri socket entropy-port state) - websocket? - (uri websocket-uri) - (socket websocket-socket) - (entropy-port websocket-entropy-port) - (state websocket-state set-websocket-state!)) + (set-nonblocking! sock) + ;; Disable buffering for websockets + (setvbuf sock 'none) -(define (display-websocket ws port) - (format port "#" - (uri->string (websocket-uri ws)) - (websocket-state ws))) + ;; TODO: Configure I/O buffering? + (connect sock (addrinfo:addr info)) + sock)) -(set-record-type-printer! display-websocket) +(define-method (write (o ) port) + (format port "#" + (.url o) + (.state o))) -(define (websocket-connecting? ws) - "Return #t if the WebSocket WS is in the connecting state." - (eq? (websocket-state ws) 'connecting)) +(define-method (websocket-connecting? (websocket )) + "Return #t if WEBSOCKET is in the connecting state." + (eq? (.state websocket) 'connecting)) -(define (websocket-open? ws) - "Return #t if the WebSocket WS is in the open state." - (eq? (websocket-state ws) 'open)) +(define-method (websocket-open? (websocket )) + "Return #t if WEBSOCKET is in the open state." + (eq? (.state websocket) 'open)) -(define (websocket-closing? ws) - "Return #t if the WebSocket WS is in the closing state." - (eq? (websocket-state ws) 'closing)) +(define-method (websocket-closing? (websocket )) + "Return #t if WEBSOCKET is in the closing state." + (eq? (.state websocket) 'closing)) -(define (websocket-closed? ws) - "Return #t if the WebSocket WS is in the closed state." - (eq? (websocket-state ws) 'closed)) +(define-method (websocket-closed? (websocket )) + "Return #t if WEBSOCKET is in the closed state." + (eq? (.state websocket) 'closed)) -(define (generate-client-key ws) +(define-method (generate-client-key (websocket )) "Return a random, base64 encoded nonce using the entropy source of -WS." +WEBSOCKET." (base64-encode - (get-bytevector-n (websocket-entropy-port ws) 16))) + (get-bytevector-n (.entropy-port websocket) 16))) ;; See Section 4.1 - Client Requirements (define (make-handshake-request uri key) @@ -129,12 +264,12 @@ KEY." (sec-websocket-version . "13")))) (build-request uri #:method 'GET #:headers headers))) -(define (handshake ws) - "Perform the WebSocket handshake for the client WS." - (let ((key (generate-client-key ws))) - (write-request (make-handshake-request (websocket-uri ws) key) - (websocket-socket ws)) - (let* ((response (read-response (websocket-socket ws))) +(define-method (handshake (websocket )) + "Perform the WebSocket handshake for the client WEBSOCKET." + (let ((key (generate-client-key websocket))) + (write-request (make-handshake-request (.uri websocket) key) + (.socket websocket)) + (let* ((response (read-response (.socket websocket))) (headers (response-headers response)) (upgrade (assoc-ref headers 'upgrade)) (connection (assoc-ref headers 'connection)) @@ -144,10 +279,12 @@ KEY." (string-ci=? (car upgrade) "websocket") (equal? connection '(upgrade)) (string=? (string-trim-both accept) (make-accept-key key))) - (set-websocket-state! ws 'open) + (set! (.state websocket) 'open) (begin - (close-websocket ws) - (error "websocket handshake failed" (websocket-uri ws))))))) + (websocket-close websocket) + ((.on-error websocket) websocket + (format #f "websocket handshake failed: ~s" + (uri->string (.uri websocket))))))))) (define (open-entropy-port) "Return an open input port to a reliable source of entropy for the @@ -156,25 +293,10 @@ current system." ;; exactly portable. (open-input-file "/dev/urandom")) -(define (make-websocket uri-or-string) - "Create and establish a new WebSocket connection for the remote -resource described by URI-OR-STRING." - (let ((uri (match uri-or-string - ((? uri? uri) uri) - ((? string? str) (string->uri str))))) - (if (websocket-uri? uri) - (let ((ws (%make-websocket uri - (make-client-socket uri) - (open-entropy-port) - 'connecting))) - (handshake ws) - ws) - (error "not a websocket uri" uri)))) - -(define (close-websocket ws) - "Close the WebSocket connection for the client WS." - (let ((socket (websocket-socket ws))) - (set-websocket-state! ws 'closing) +(define-method (websocket-close (websocket )) + "Close the WebSocket connection for the client WEBSOCKET." + (let ((socket (.socket websocket))) + (set! (.state websocket) 'closing) (write-frame (make-close-frame (make-bytevector 0)) socket) ;; Per section 5.5.1 , wait for the server to close the connection ;; for a reasonable amount of time. @@ -185,26 +307,5 @@ resource described by URI-OR-STRING." (read-frame socket) ; throw it away (loop))))) (close-port socket) - (close-port (websocket-entropy-port ws)) - (set-websocket-state! ws 'closed) - *unspecified*)) - -(define (generate-masking-key ws) - "Create a new masking key using the entropy source of WS." - ;; Masking keys are 32 bits long. - (get-bytevector-n (websocket-entropy-port ws) 4)) - -(define (websocket-send ws data) - "Send DATA, a string or bytevector, to the server that WS is -connected to." - ;; TODO: Send frames over some threshold in fragments. - (write-frame (make-text-frame data (generate-masking-key ws)) - (websocket-socket ws))) - -(define (websocket-receive ws) - "Read a response from the server that WS is connected to." - ;; TODO: Handle fragmented frames and control frames. - (let ((frame (read-frame (websocket-socket ws)))) - (if (binary-frame? frame) - (frame-data frame) - (text-frame->string frame)))) + (close-port (.entropy-port websocket)) + (set! (.state websocket) 'closed)))