X-Git-Url: https://jxself.org/git/?a=blobdiff_plain;f=8sync%2Fsystems%2Fwebsocket%2Fserver.scm;h=d9105e8454bd9e92d58c331daa294bd9e6cf0fa4;hb=7410337fd528f81a29e9c1c5645cbb9fa47bfd16;hp=7a7526d054f8a4fadb645d6448eedc84d887ccdb;hpb=0f7daa4787860cbbc739a51140fb8257b7fc4fef;p=8sync.git diff --git a/8sync/systems/websocket/server.scm b/8sync/systems/websocket/server.scm index 7a7526d..d9105e8 100644 --- a/8sync/systems/websocket/server.scm +++ b/8sync/systems/websocket/server.scm @@ -37,6 +37,10 @@ #:use-module (8sync systems web) #:use-module (8sync systems websocket frame) #:use-module (8sync systems websocket utils) + #:use-module (fibers) + #:use-module (fibers conditions) + #:use-module (fibers operations) + #:use-module (fibers timers) #:export ( .websocket-handler)) @@ -111,9 +115,12 @@ called for each complete message that is received." ;; @@: We probably could just increment a counter... (define client-id (web-server-gen-client-id websocket-server)) + (define client-dead? (make-condition)) + (define (close-down) (close-port client) (hash-remove! (.ws-clients websocket-server) client-id) + (signal-condition! client-dead?) ((.on-ws-client-disconnect websocket-server) websocket-server client-id)) @@ -125,12 +132,46 @@ called for each complete message that is received." ((.on-ws-client-connect websocket-server) websocket-server client-id) - (with-actor-nonblocking-ports - (lambda () - ;; Perform the HTTP handshake and upgrade to WebSocket protocol. - (let* ((client-key (assoc-ref (request-headers request) 'sec-websocket-key)) - (response (make-handshake-response client-key))) - (write-response response client) + ;; Perform the HTTP handshake and upgrade to WebSocket protocol. + (let* ((client-key (assoc-ref (request-headers request) 'sec-websocket-key)) + (response (make-handshake-response client-key))) + (write-response response client) + ;; TODO: we shoud split clients into their own objects or fibers + ;; structure... what happens if we try to write at the same time + ;; that we're writing another frame? + ;; Probably it's good to have different fibers or methods for: + ;; - reader method/fiber + ;; - checks for client-dead? condition + ;; - may kick off some logic, which may send a message to writer + ;; - "nonblocking" reads + ;; - writer method/thread + ;; - checks for client-dead? condition + ;; - accepts one message at a time to write + ;; - "blocking" writes + ;; - ping timer method + ;; - checks for client-dead? condition + ;; - occasionally sends a message to the writer to do a ping + (let* ((ws (actor-id websocket-server)) + (send-ping + (lambda () + (<- ws 'ws-send client-id 'ping)))) + (spawn-fiber + (lambda () + (let lp () + (and (perform-operation + (choice-operation + ;; sleep and send a ping + (wrap-operation (sleep-operation 5) + (lambda () + (send-ping) + #t)) ; loop again + ;; If the client is dead, don't loop + (wrap-operation (wait-operation client-dead?) + (const #f)))) + (lp)))))) + + (with-actor-nonblocking-ports + (lambda () (let loop ((fragments '()) (type #f)) (let ((frame (read-frame-maybe))) @@ -176,7 +217,9 @@ called for each complete message that is received." (cond ((string? data) (make-text-frame data)) ((bytevector? data) - (make-binary-frame data))) + (make-binary-frame data)) + ('ping + (make-ping-frame (string->utf8 "hi")))) client) ;; ok is like success, amirite 'ok))