websocket: Queue fragmented frames to avoid interleaving.
[8sync.git] / 8sync / systems / websocket / client.scm
index 11702f7564c8dbaf6ba35bccc9a19df8e0badfd5..48c3c1093cd38985b8ef0314f88467814b06bacd 100644 (file)
@@ -2,6 +2,7 @@
 ;;; Copyright © 2016 David Thompson <davet@gnu.org>
 ;;; Copyright © 2017 Christopher Allan Webber <cwebber@dustycloud.org>
 ;;; Copyright © 2019, 2020 Jan (janneke) Nieuwenhuizen <janneke@gnu.org>
+;;; Copyright © 2020 Rutger van Beusekom <rutger.van.beusekom@gmail.com>
 ;;;
 ;;; This file is part of guile-websocket.
 ;;;
@@ -71,6 +72,7 @@
   (url #:getter .url #:init-value #f #:init-keyword #:url)
   (uri #:accessor .uri #:init-value #f #:init-keyword #:uri)
   (entropy-port #:accessor .entropy-port #:init-form (open-entropy-port))
+  (frames #:accessor .frames #:init-value '())
 
   (on-close #:init-keyword #:on-close
                  #:init-value no-op
             ((.on-error websocket) websocket (format #f "not a websocket uri: ~s" uri-or-string))))
       ((.on-error websocket) websocket (format #f "cannot open websocket in state: ~s" (.state websocket)))))
 
-(define-method (websocket-send (websocket <websocket>) message data)
+(define (subbytevector bv start end)
+  (if (= (bytevector-length bv) end) bv
+      (let* ((length (- end start))
+             (sub (make-bytevector length)))
+        (bytevector-copy! bv start sub 0 length)
+        sub)))
+
+(define* (make-fragmented-frames data #:key (fragment-size (expt 2 15)))
+  (let ((length (if (string? data) (string-length data)
+                    (bytevector-length data))))
+    (let loop ((offset 0))
+      (let* ((size (min fragment-size (- length offset)))
+             (end (+ offset size))
+             (final? (= end length))
+             (continuation? (not (zero? offset)))
+             (frame (if (string? data) (make-text-frame (substring data offset end) #:final? final? #:continuation? continuation?)
+                        (make-binary-frame (subbytevector data offset end) #:final? final? #:continuation? continuation?))))
+        (if final? (list frame)
+            (cons frame (loop end)))))))
+
+(define-method (websocket-direct-send (websocket <websocket>) message data)
   (catch #t           ; expect: wrong-type-arg (open port), system-error
     (lambda _
-      (write-frame
-       (cond ((string? data)
-              (make-text-frame data))
-             ((bytevector? data)
-              (make-binary-frame data)))
-       (.socket websocket)))
+      (let* ((frames (make-fragmented-frames data)))
+        (let loop ((frames frames) (written '(nothing)))
+          (when (pair? frames)
+            (write-frame (car frames) (.socket websocket))
+            (loop (cdr frames) (cons (car frames) written)))
+          ;;(format (current-error-port) "done\n")
+          )))
+    (lambda (key . args)
+      (let ((message (format #f "~a: ~s" key args)))
+        ((.on-error websocket) websocket (format #f "send failed: ~s ~a\n" websocket message))
+        (websocket-close websocket message)))))
+
+(define-method (websocket-queue-or-send (websocket <websocket>) message data)
+  (catch #t         ; expect: wrong-type-arg (open port), system-error
+    (lambda _
+      (let* ((frames (make-fragmented-frames data))
+            (frames? (pair? (.frames websocket))))
+       (set! (.frames websocket) (append (.frames websocket) frames))
+       (unless frames?
+         (let loop ()
+           (let ((frames (.frames websocket)))
+             (when (pair? frames)
+               (write-frame (car frames) (.socket websocket))
+               (set! (.frames websocket) (cdr (.frames websocket)))
+               (loop))
+              ;;(unless (pair? frames) (format (current-error-port) "done\n"))
+              )))))
     (lambda (key . args)
       (let ((message (format #f "~a: ~s" key args)))
         ((.on-error websocket) websocket (format #f "send failed: ~s ~a\n" websocket message))
         (websocket-close websocket message)))))
 
+(define-method (websocket-send (websocket <websocket>) message data)
+  (websocket-queue-or-send websocket message data)
+  ;;(websocket-direct-send websocket message data)
+  )
+
 (define-method (websocket-init (websocket <websocket>) message)
   (and=> (.url websocket) (cut websocket-open websocket message <>)))
 
+(define-method (websocket-socket-open? (websocket <websocket>))
+  "Return #t if .SOCKET of WEBSOCKET is open."
+  (not (port-closed? (.socket websocket))))
+
 (define-method (websocket-loop (websocket <websocket>) message)
 
   (define (handle-data-frame type data)
 
   (let loop ((fragments '())
              (type #f))
-    (let* ((socket (.socket websocket))
-           (frame (and (websocket-open? websocket)
-                       (read-frame-maybe))))
-      (cond
-       ;; EOF - port is closed.
-       ;; @@: Sometimes the eof object appears here as opposed to
-       ;;   at lookahead, but I'm not sure why
-       ((or (not frame) (eof-object? frame))
-        (close-down))
-       ;; Per section 5.4, control frames may appear interspersed
-       ;; along with a fragmented message.
-       ((close-frame? frame)
-        ;; Per section 5.5.1, echo the close frame back to the
-        ;; socket before closing the socket.  The socket may no
-        ;; longer be listening.
-        (false-if-exception
-         (write-frame (make-close-frame (frame-data frame)) socket))
-        (close-down))
-       ((ping-frame? frame)
-        ;; Per section 5.5.3, a pong frame must include the exact
-        ;; same data as the ping frame.
-        (write-frame (make-pong-frame (frame-data frame)) socket)
-        (loop fragments type))
-       ((pong-frame? frame)           ; silently ignore pongs
-        (loop fragments type))
-       ((first-fragment-frame? frame) ; begin accumulating fragments
-        (loop (list frame) (frame-type frame)))
-       ((final-fragment-frame? frame) ; concatenate all fragments
-        (handle-data-frame type (frame-concatenate
-                                 (reverse (cons frame fragments))))
-        (loop '() #f))
-       ((fragment-frame? frame)       ; add a fragment
-        (loop (cons frame fragments) type))
-       ((data-frame? frame)           ; unfragmented data frame
-        (handle-data-frame (frame-type frame) (frame-data frame))
-        (loop '() #f))))))
+    (catch #t
+      (lambda _
+        (let* ((socket (.socket websocket))
+               (frame (and (websocket-open? websocket)
+                           (read-frame-maybe))))
+          (cond
+           ;; EOF - port is closed.
+           ;; @@: Sometimes the eof object appears here as opposed to
+           ;;   at lookahead, but I'm not sure why
+           ((or (not frame) (eof-object? frame))
+            (close-down))
+           ;; Per section 5.4, control frames may appear interspersed
+           ;; along with a fragmented message.
+           ((close-frame? frame)
+            ;; Per section 5.5.1, echo the close frame back to the
+            ;; socket before closing the socket.  The socket may no
+            ;; longer be listening.
+            (false-if-exception
+             (write-frame (make-close-frame (frame-data frame)) socket))
+            (close-down))
+           ((ping-frame? frame)
+            ;; Per section 5.5.3, a pong frame must include the exact
+            ;; same data as the ping frame.
+            (write-frame (make-pong-frame (frame-data frame)) socket)
+            (loop fragments type))
+           ((pong-frame? frame)         ; silently ignore pongs
+            (loop fragments type))
+           ((first-fragment-frame? frame) ; begin accumulating fragments
+            (loop (list frame) (frame-type frame)))
+           ((final-fragment-frame? frame) ; concatenate all fragments
+            (handle-data-frame type (frame-concatenate
+                                     (reverse (cons frame fragments))))
+            (loop '() #f))
+           ((fragment-frame? frame)     ; add a fragment
+            (loop (cons frame fragments) type))
+           ((data-frame? frame)         ; unfragmented data frame
+            (handle-data-frame (frame-type frame) (frame-data frame))
+            (loop '() #f)))))
+      (lambda (key . args)
+        (let ((message (format #f "~a: ~s" key args)))
+          ((.on-error websocket) websocket (format #f "read failed: ~s\n" websocket))
+          (if (websocket-socket-open? websocket) (loop '() #f)
+              (websocket-close websocket message)))))))
 
 ;; See Section 3 - WebSocket URIs
 (define (encrypted-websocket-scheme? uri)
@@ -289,9 +348,14 @@ KEY."
 (define (open-entropy-port)
   "Return an open input port to a reliable source of entropy for the
 current system."
-  ;; XXX: This works on GNU/Linux and OS X systems, but this isn't
-  ;; exactly portable.
-  (open-input-file "/dev/urandom"))
+  (if (file-exists? "/dev/urandom")
+      (open-input-file "/dev/urandom")
+      ;; XXX: This works as a fall back but this isn't exactly a
+      ;; reliable source of entropy.
+      (make-soft-port (vector (const #f) (const #f) (const #f)
+                              (lambda _ (let ((r (random 256))) (integer->char r)))
+                              (const #f)
+                              (const #t)) "r")))
 
 (define-method (websocket-close (websocket <websocket>))
   "Close the WebSocket connection for the client WEBSOCKET."