websocket: Ping client occasionally to prevent timeoiuts.
[8sync.git] / 8sync / systems / websocket / server.scm
index 6283255ede52bcc5bfc48277e134878a538665fc..7abd6c47febd9fb034132feefbde22d7e7043aef 100644 (file)
   #:use-module (8sync systems web)
   #:use-module (8sync systems websocket frame)
   #:use-module (8sync systems websocket utils)
+  #:use-module (fibers)
+  #:use-module (fibers conditions)
+  #:use-module (fibers operations)
+  #:use-module (fibers timers)
   #:export (<websocket-server>
             .websocket-handler))
 
@@ -69,7 +73,7 @@ string."
 (define-actor <websocket-server> (<web-server>)
   ((ws-send websocket-server-send))
   (upgrade-paths #:init-value `(("websocket" .
-                                 ,(wrap-apply websocket-client-loop)))
+                                 ,(live-wrap websocket-client-loop)))
                  #:allocation #:each-subclass
                  #:accessor .upgrade-paths)
 
@@ -111,9 +115,12 @@ called for each complete message that is received."
   ;; @@: We probably could just increment a counter...
   (define client-id (web-server-gen-client-id websocket-server))
 
+  (define client-dead? (make-condition))
+
   (define (close-down)
     (close-port client)
     (hash-remove! (.ws-clients websocket-server) client-id)
+    (signal-condition! client-dead?)
     ((.on-ws-client-disconnect websocket-server)
      websocket-server client-id))
 
@@ -129,54 +136,92 @@ called for each complete message that is received."
   (let* ((client-key (assoc-ref (request-headers request) 'sec-websocket-key))
          (response (make-handshake-response client-key)))
     (write-response response client)
-    (let loop ((fragments '())
-               (type #f))
-      (let ((frame (read-frame-maybe)))
-        (cond
-         ;; EOF - port is closed.
-         ;; @@: Sometimes the eof object appears here as opposed to
-         ;;   at lookahead, but I'm not sure why
-         ((or (not frame) (eof-object? frame))
-          (close-down))
-         ;; Per section 5.4, control frames may appear interspersed
-         ;; along with a fragmented message.
-         ((close-frame? frame)
-          ;; Per section 5.5.1, echo the close frame back to the
-          ;; client before closing the socket.  The client may no
-          ;; longer be listening.
-          (false-if-exception
-           (write-frame (make-close-frame (frame-data frame)) client))
-          (close-down))
-         ((ping-frame? frame)
-          ;; Per section 5.5.3, a pong frame must include the exact
-          ;; same data as the ping frame.
-          (write-frame (make-pong-frame (frame-data frame)) client)
-          (loop fragments type))
-         ((pong-frame? frame) ; silently ignore pongs
-          (loop fragments type))
-         ((first-fragment-frame? frame) ; begin accumulating fragments
-          (loop (list frame) (frame-type frame)))
-         ((final-fragment-frame? frame) ; concatenate all fragments
-          (handle-data-frame type (frame-concatenate (reverse fragments)))
-          (loop '() #f))
-         ((fragment-frame? frame) ; add a fragment
-          (loop (cons frame fragments) type))
-         ((data-frame? frame) ; unfragmented data frame
-          (handle-data-frame (frame-type frame) (frame-data frame))
-          (loop '() #f)))))))
+    ;; TODO: we shoud split clients into their own objects or fibers
+    ;; structure... what happens if we try to write at the same time
+    ;; that we're writing another frame?
+    ;; Probably it's good to have different fibers or methods for:
+    ;;  - reader method/fiber
+    ;;    - checks for client-dead? condition
+    ;;    - may kick off some logic, which may send a message to writer
+    ;;    - "nonblocking" reads
+    ;;  - writer method/thread
+    ;;    - checks for client-dead? condition
+    ;;    - accepts one message at a time to write
+    ;;    - "blocking" writes
+    ;;  - ping timer method
+    ;;    - checks for client-dead? condition
+    ;;    - occasionally sends a message to the writer to do a ping
+    (let* ((ws (actor-id websocket-server))
+           (send-ping
+            (lambda ()
+              (<- ws 'ws-send client-id 'ping))))
+      (spawn-fiber
+       (lambda ()
+         (let lp ()
+           (and (perform-operation
+                 (choice-operation
+                  ;; sleep and send a ping
+                  (wrap-operation (sleep-operation 5)
+                                  (lambda ()
+                                    (send-ping)
+                                    #t)) ; loop again
+                  ;; If the client is dead, don't loop
+                  (wrap-operation (wait-operation client-dead?)
+                                  (const #f))))
+                (lp))))))
+
+    (with-actor-nonblocking-ports
+     (let loop ((fragments '())
+                (type #f))
+       (let ((frame (read-frame-maybe)))
+         (cond
+          ;; EOF - port is closed.
+          ;; @@: Sometimes the eof object appears here as opposed to
+          ;;   at lookahead, but I'm not sure why
+          ((or (not frame) (eof-object? frame))
+           (close-down))
+          ;; Per section 5.4, control frames may appear interspersed
+          ;; along with a fragmented message.
+          ((close-frame? frame)
+           ;; Per section 5.5.1, echo the close frame back to the
+           ;; client before closing the socket.  The client may no
+           ;; longer be listening.
+           (false-if-exception
+            (write-frame (make-close-frame (frame-data frame)) client))
+           (close-down))
+          ((ping-frame? frame)
+           ;; Per section 5.5.3, a pong frame must include the exact
+           ;; same data as the ping frame.
+           (write-frame (make-pong-frame (frame-data frame)) client)
+           (loop fragments type))
+          ((pong-frame? frame) ; silently ignore pongs
+           (loop fragments type))
+          ((first-fragment-frame? frame) ; begin accumulating fragments
+           (loop (list frame) (frame-type frame)))
+          ((final-fragment-frame? frame) ; concatenate all fragments
+           (handle-data-frame type (frame-concatenate (reverse fragments)))
+           (loop '() #f))
+          ((fragment-frame? frame) ; add a fragment
+           (loop (cons frame fragments) type))
+          ((data-frame? frame) ; unfragmented data frame
+           (handle-data-frame (frame-type frame) (frame-data frame))
+           (loop '() #f))))))))
 
 (define (websocket-server-send websocket-server message client-id data)
-  (cond ((hash-ref (.ws-clients websocket-server) client-id) =>
-         (lambda (client)
-           (write-frame
-            (cond ((string? data)
-                   (make-text-frame data))
-                  ((bytevector? data)
-                   (make-binary-frame data)))
-            client)
-           ;; ok is like success, amirite
-           (<-reply message 'ok)))
-        (else
-         ;; No such client with that id.
-         ;; Either it closed, or it was never there.
-         (<-reply message 'client-gone))))
+  (with-actor-nonblocking-ports
+   (lambda ()
+     (cond ((hash-ref (.ws-clients websocket-server) client-id) =>
+            (lambda (client)
+              (write-frame
+               (cond ((string? data)
+                      (make-text-frame data))
+                     ((bytevector? data)
+                      (make-binary-frame data))
+                     ('ping
+                      (make-ping-frame (string->utf8 "hi"))))
+               client)
+              ;; ok is like success, amirite
+              'ok))
+           ;; No such client with that id.
+           ;; Either it closed, or it was never there.
+           (else 'client-gone)))))