websocket: Initial <websocket> client actor support.
[8sync.git] / 8sync / systems / websocket / client.scm
index 86f82ef4bb61e8b8c472129381de0bb3d0c6c502..186b1b29033454fef0e5b1d056f15188692ec926 100644 (file)
@@ -1,5 +1,7 @@
 ;;; guile-websocket --- WebSocket client/server
 ;;; Copyright © 2016 David Thompson <davet@gnu.org>
+;;; Copyright © 2017 Christopher Allan Webber <cwebber@dustycloud.org>
+;;; Copyright © 2019, 2020 Jan (janneke) Nieuwenhuizen <janneke@gnu.org>
 ;;;
 ;;; This file is part of guile-websocket.
 ;;;
 
 (define-module (8sync systems websocket client)
   #:use-module (ice-9 match)
+  #:use-module (srfi srfi-26)
   #:use-module (rnrs bytevectors)
   #:use-module (rnrs io ports)
-  #:use-module (srfi srfi-9)
-  #:use-module (srfi srfi-9 gnu)
   #:use-module (web request)
   #:use-module (web response)
   #:use-module (web uri)
+  #:use-module (oop goops)
+  #:use-module (8sync)
+  #:use-module (8sync ports)
   #:use-module (8sync contrib base64)
   #:use-module (8sync systems websocket frame)
   #:use-module (8sync systems websocket utils)
-  #:export (make-websocket
-            websocket?
-            websocket-uri
-            websocket-state
+  #:export (<websocket>
+            .on-close
+            .on-error
+            .on-message
+            .on-open
+            .socket
+            .state
+            .url
+
+            websocket-closed?
+            websocket-closing?
+            websocket-connect
             websocket-connecting?
             websocket-open?
-            websocket-closing?
-            websocket-closed?
-            close-websocket
-            websocket-send
-            websocket-receive))
+
+            websocket-close
+            websocket-loop
+            websocket-send))
+
+(define no-op (const #f))
+
+(define-actor <websocket> (<actor>)
+  ((*init* websocket-init)
+   (close websocket-close)
+   (open websocket-open)
+   (send websocket-send))
+
+  (state #:accessor .state #:init-value 'closed #:init-keyword #:state)
+  (socket #:accessor .socket #:init-value #f #:init-keyword #:socket)
+  (url #:getter .url #:init-value #f #:init-keyword #:url)
+  (uri #:accessor .uri #:init-value #f #:init-keyword #:uri)
+  (entropy-port #:accessor .entropy-port #:init-form (open-entropy-port))
+
+  (on-close #:init-keyword #:on-close
+                 #:init-value no-op
+                 #:accessor .on-close)
+  (on-error #:init-keyword #:on-error
+            #:init-value no-op
+            #:accessor .on-error)
+  (on-message #:init-keyword #:on-message
+              #:accessor .on-message)
+  (on-open #:init-keyword #:on-open
+                #:init-value no-op
+                #:accessor .on-open))
+
+(define-method (websocket-close (websocket <websocket>) message)
+  (when (websocket-open? websocket)
+    (false-if-exception (close-port (.socket websocket)))
+    (set! (.state websocket) 'closed)
+    (false-if-exception ((.on-close websocket) websocket))
+    (set! (.socket websocket) #f)))
+
+(define-method (websocket-open (websocket <websocket>) message uri-or-string)
+  (if (websocket-closed? websocket)
+      (let ((uri (match uri-or-string
+                   ((? uri? uri) uri)
+                   ((? string? str) (string->uri str)))))
+        (if (websocket-uri? uri)
+            (catch 'system-error
+              (lambda _
+                (set! (.uri websocket) uri)
+                (let ((sock (make-client-socket uri)))
+                  (set! (.socket websocket) sock)
+                  (handshake websocket)
+                  (websocket-loop websocket message)))
+              (lambda (key . args)
+                ((.on-error websocket) websocket (format #f "open failed: ~s: ~s" uri-or-string args))))
+            ((.on-error websocket) websocket (format #f "not a websocket uri: ~s" uri-or-string))))
+      ((.on-error websocket) websocket (format #f "cannot open websocket in state: ~s" (.state websocket)))))
+
+(define-method (websocket-send (websocket <websocket>) message data)
+  (catch #t           ; expect: wrong-type-arg (open port), system-error
+    (lambda _
+      (write-frame
+       (cond ((string? data)
+              (make-text-frame data))
+             ((bytevector? data)
+              (make-binary-frame data)))
+       (.socket websocket)))
+    (lambda (key . args)
+      (unless (and (memq key '(system-error wrong-type-arg))
+                   (match args
+                     (((or "put-u8" "put-bytevector") arg ...) #t)
+                     (_ #f)))
+        (apply throw key args))
+      (let ((message (format #f "~a: ~s" key args)))
+        ((.on-error websocket) websocket (format #f "send failed: ~s ~a\n" websocket message))
+        (websocket-close websocket message)))))
+
+(define-method (websocket-init (websocket <websocket>) message)
+  (and=> (.url websocket) (cut websocket-open websocket message <>)))
+
+(define-method (websocket-loop (websocket <websocket>) message)
+
+  (define (handle-data-frame type data)
+    ((.on-message websocket)
+     websocket
+     (match type
+       ('text   (utf8->string data))
+       ('binary data))))
+
+  (define (read-frame-maybe)
+    (and (not (eof-object? (lookahead-u8 (.socket websocket))))
+         (read-frame (.socket websocket))))
+
+  (define (close-down)
+    (websocket-close websocket message))
+
+  ((.on-open websocket) websocket)
+
+  (let loop ((fragments '())
+             (type #f))
+    (let* ((socket (.socket websocket))
+           (frame (and (websocket-open? websocket)
+                       (read-frame-maybe))))
+      (cond
+       ;; EOF - port is closed.
+       ;; @@: Sometimes the eof object appears here as opposed to
+       ;;   at lookahead, but I'm not sure why
+       ((or (not frame) (eof-object? frame))
+        (close-down))
+       ;; Per section 5.4, control frames may appear interspersed
+       ;; along with a fragmented message.
+       ((close-frame? frame)
+        ;; Per section 5.5.1, echo the close frame back to the
+        ;; socket before closing the socket.  The socket may no
+        ;; longer be listening.
+        (false-if-exception
+         (write-frame (make-close-frame (frame-data frame)) socket))
+        (close-down))
+       ((ping-frame? frame)
+        ;; Per section 5.5.3, a pong frame must include the exact
+        ;; same data as the ping frame.
+        (write-frame (make-pong-frame (frame-data frame)) socket)
+        (loop fragments type))
+       ((pong-frame? frame)           ; silently ignore pongs
+        (loop fragments type))
+       ((first-fragment-frame? frame) ; begin accumulating fragments
+        (loop (list frame) (frame-type frame)))
+       ((final-fragment-frame? frame) ; concatenate all fragments
+        (handle-data-frame type (frame-concatenate
+                                 (reverse (cons frame fragments))))
+        (loop '() #f))
+       ((fragment-frame? frame)       ; add a fragment
+        (loop (cons frame fragments) type))
+       ((data-frame? frame)           ; unfragmented data frame
+        (handle-data-frame (frame-type frame) (frame-data frame))
+        (loop '() #f))))))
 
 ;; See Section 3 - WebSocket URIs
 (define (encrypted-websocket-scheme? uri)
@@ -64,6 +205,10 @@ scheme."
            (unencrypted-websocket-scheme? uri))
        (not (uri-fragment uri))))
 
+(define (set-nonblocking! port)
+  (fcntl port F_SETFL (logior O_NONBLOCK (fcntl port F_GETFL)))
+  (setvbuf port 'block 1024))
+
 (define (make-client-socket uri)
   "Connect a socket to the remote resource described by URI."
   (let* ((port (uri-port uri))
@@ -74,48 +219,43 @@ scheme."
                                  (if port
                                      AI_NUMERICSERV
                                      0))))
-         (s (with-fluids ((%default-port-encoding #f))
-              (socket (addrinfo:fam info) SOCK_STREAM IPPROTO_IP))))
-    ;; TODO: Configure I/O buffering?
-    (connect s (addrinfo:addr info))
-    s))
+         (sock (with-fluids ((%default-port-encoding #f))
+                 (socket (addrinfo:fam info) SOCK_STREAM IPPROTO_IP))))
 
-(define-record-type <websocket>
-  (%make-websocket uri socket entropy-port state)
-  websocket?
-  (uri websocket-uri)
-  (socket websocket-socket)
-  (entropy-port websocket-entropy-port)
-  (state websocket-state set-websocket-state!))
+    (set-nonblocking! sock)
+    ;; Disable buffering for websockets
+    (setvbuf sock 'none)
 
-(define (display-websocket ws port)
-  (format port "#<websocket ~a ~a>"
-          (uri->string (websocket-uri ws))
-          (websocket-state ws)))
+    ;; TODO: Configure I/O buffering?
+    (connect sock (addrinfo:addr info))
+    sock))
 
-(set-record-type-printer! <websocket> display-websocket)
+(define-method (write (o <websocket>) port)
+   (format port "#<websocket ~a ~a>"
+           (.url o)
+           (.state o)))
 
-(define (websocket-connecting? ws)
-  "Return #t if the WebSocket WS is in the connecting state."
-  (eq? (websocket-state ws) 'connecting))
+(define-method (websocket-connecting? (websocket <websocket>))
+  "Return #t if WEBSOCKET is in the connecting state."
+  (eq? (.state websocket) 'connecting))
 
-(define (websocket-open? ws)
-  "Return #t if the WebSocket WS is in the open state."
-  (eq? (websocket-state ws) 'open))
+(define-method (websocket-open? (websocket <websocket>))
+  "Return #t if WEBSOCKET is in the open state."
+  (eq? (.state websocket) 'open))
 
-(define (websocket-closing? ws)
-  "Return #t if the WebSocket WS is in the closing state."
-  (eq? (websocket-state ws) 'closing))
+(define-method (websocket-closing? (websocket <websocket>))
+  "Return #t if WEBSOCKET is in the closing state."
+  (eq? (.state websocket) 'closing))
 
-(define (websocket-closed? ws)
-  "Return #t if the WebSocket WS is in the closed state."
-  (eq? (websocket-state ws) 'closed))
+(define-method (websocket-closed? (websocket <websocket>))
+  "Return #t if WEBSOCKET is in the closed state."
+  (eq? (.state websocket) 'closed))
 
-(define (generate-client-key ws)
+(define-method (generate-client-key (websocket <websocket>))
   "Return a random, base64 encoded nonce using the entropy source of
-WS."
+WEBSOCKET."
   (base64-encode
-   (get-bytevector-n (websocket-entropy-port ws) 16)))
+   (get-bytevector-n (.entropy-port websocket) 16)))
 
 ;; See Section 4.1 - Client Requirements
 (define (make-handshake-request uri key)
@@ -129,12 +269,12 @@ KEY."
                    (sec-websocket-version . "13"))))
     (build-request uri #:method 'GET #:headers headers)))
 
-(define (handshake ws)
-  "Perform the WebSocket handshake for the client WS."
-  (let ((key (generate-client-key ws)))
-    (write-request (make-handshake-request (websocket-uri ws) key)
-                   (websocket-socket ws))
-    (let* ((response (read-response (websocket-socket ws)))
+(define-method (handshake (websocket <websocket>))
+  "Perform the WebSocket handshake for the client WEBSOCKET."
+  (let ((key (generate-client-key websocket)))
+    (write-request (make-handshake-request (.uri websocket) key)
+                   (.socket websocket))
+    (let* ((response (read-response (.socket websocket)))
            (headers (response-headers response))
            (upgrade (assoc-ref headers 'upgrade))
            (connection (assoc-ref headers 'connection))
@@ -144,10 +284,12 @@ KEY."
                (string-ci=? (car upgrade) "websocket")
                (equal? connection '(upgrade))
                (string=? (string-trim-both accept) (make-accept-key key)))
-          (set-websocket-state! ws 'open)
+          (set! (.state websocket) 'open)
           (begin
-            (close-websocket ws)
-            (error "websocket handshake failed" (websocket-uri ws)))))))
+            (websocket-close websocket)
+            ((.on-error websocket) websocket
+             (format #f "websocket handshake failed: ~s"
+                     (uri->string (.uri websocket)))))))))
 
 (define (open-entropy-port)
   "Return an open input port to a reliable source of entropy for the
@@ -156,25 +298,10 @@ current system."
   ;; exactly portable.
   (open-input-file "/dev/urandom"))
 
-(define (make-websocket uri-or-string)
-  "Create and establish a new WebSocket connection for the remote
-resource described by URI-OR-STRING."
-  (let ((uri (match uri-or-string
-               ((? uri? uri) uri)
-               ((? string? str) (string->uri str)))))
-    (if (websocket-uri? uri)
-        (let ((ws (%make-websocket uri
-                                   (make-client-socket uri)
-                                   (open-entropy-port)
-                                   'connecting)))
-          (handshake ws)
-          ws)
-        (error "not a websocket uri" uri))))
-
-(define (close-websocket ws)
-  "Close the WebSocket connection for the client WS."
-  (let ((socket (websocket-socket ws)))
-    (set-websocket-state! ws 'closing)
+(define-method (websocket-close (websocket <websocket>))
+  "Close the WebSocket connection for the client WEBSOCKET."
+  (let ((socket (.socket websocket)))
+    (set! (.state websocket) 'closing)
     (write-frame (make-close-frame (make-bytevector 0)) socket)
     ;; Per section 5.5.1 , wait for the server to close the connection
     ;; for a reasonable amount of time.
@@ -185,26 +312,5 @@ resource described by URI-OR-STRING."
            (read-frame socket) ; throw it away
            (loop)))))
     (close-port socket)
-    (close-port (websocket-entropy-port ws))
-    (set-websocket-state! ws 'closed)
-    *unspecified*))
-
-(define (generate-masking-key ws)
-  "Create a new masking key using the entropy source of WS."
-  ;; Masking keys are 32 bits long.
-  (get-bytevector-n (websocket-entropy-port ws) 4))
-
-(define (websocket-send ws data)
-  "Send DATA, a string or bytevector, to the server that WS is
-connected to."
-  ;; TODO: Send frames over some threshold in fragments.
-  (write-frame (make-text-frame data (generate-masking-key ws))
-               (websocket-socket ws)))
-
-(define (websocket-receive ws)
-  "Read a response from the server that WS is connected to."
-  ;; TODO: Handle fragmented frames and control frames.
-  (let ((frame (read-frame (websocket-socket ws))))
-    (if (binary-frame? frame)
-        (frame-data frame)
-        (text-frame->string frame))))
+    (close-port (.entropy-port websocket))
+    (set! (.state websocket) 'closed)))