websocket: Fix use of with-actor-nonblocking-ports
[8sync.git] / 8sync / systems / websocket / server.scm
index 7a7526d054f8a4fadb645d6448eedc84d887ccdb..d9105e8454bd9e92d58c331daa294bd9e6cf0fa4 100644 (file)
   #:use-module (8sync systems web)
   #:use-module (8sync systems websocket frame)
   #:use-module (8sync systems websocket utils)
+  #:use-module (fibers)
+  #:use-module (fibers conditions)
+  #:use-module (fibers operations)
+  #:use-module (fibers timers)
   #:export (<websocket-server>
             .websocket-handler))
 
@@ -111,9 +115,12 @@ called for each complete message that is received."
   ;; @@: We probably could just increment a counter...
   (define client-id (web-server-gen-client-id websocket-server))
 
+  (define client-dead? (make-condition))
+
   (define (close-down)
     (close-port client)
     (hash-remove! (.ws-clients websocket-server) client-id)
+    (signal-condition! client-dead?)
     ((.on-ws-client-disconnect websocket-server)
      websocket-server client-id))
 
@@ -125,12 +132,46 @@ called for each complete message that is received."
   ((.on-ws-client-connect websocket-server)
    websocket-server client-id)
 
-  (with-actor-nonblocking-ports
-   (lambda ()
-     ;; Perform the HTTP handshake and upgrade to WebSocket protocol.
-     (let* ((client-key (assoc-ref (request-headers request) 'sec-websocket-key))
-            (response (make-handshake-response client-key)))
-       (write-response response client)
+  ;; Perform the HTTP handshake and upgrade to WebSocket protocol.
+  (let* ((client-key (assoc-ref (request-headers request) 'sec-websocket-key))
+         (response (make-handshake-response client-key)))
+    (write-response response client)
+    ;; TODO: we shoud split clients into their own objects or fibers
+    ;; structure... what happens if we try to write at the same time
+    ;; that we're writing another frame?
+    ;; Probably it's good to have different fibers or methods for:
+    ;;  - reader method/fiber
+    ;;    - checks for client-dead? condition
+    ;;    - may kick off some logic, which may send a message to writer
+    ;;    - "nonblocking" reads
+    ;;  - writer method/thread
+    ;;    - checks for client-dead? condition
+    ;;    - accepts one message at a time to write
+    ;;    - "blocking" writes
+    ;;  - ping timer method
+    ;;    - checks for client-dead? condition
+    ;;    - occasionally sends a message to the writer to do a ping
+    (let* ((ws (actor-id websocket-server))
+           (send-ping
+            (lambda ()
+              (<- ws 'ws-send client-id 'ping))))
+      (spawn-fiber
+       (lambda ()
+         (let lp ()
+           (and (perform-operation
+                 (choice-operation
+                  ;; sleep and send a ping
+                  (wrap-operation (sleep-operation 5)
+                                  (lambda ()
+                                    (send-ping)
+                                    #t)) ; loop again
+                  ;; If the client is dead, don't loop
+                  (wrap-operation (wait-operation client-dead?)
+                                  (const #f))))
+                (lp))))))
+
+    (with-actor-nonblocking-ports
+     (lambda ()
        (let loop ((fragments '())
                   (type #f))
          (let ((frame (read-frame-maybe)))
@@ -176,7 +217,9 @@ called for each complete message that is received."
                (cond ((string? data)
                       (make-text-frame data))
                      ((bytevector? data)
-                      (make-binary-frame data)))
+                      (make-binary-frame data))
+                     ('ping
+                      (make-ping-frame (string->utf8 "hi"))))
                client)
               ;; ok is like success, amirite
               'ok))