websocket: Queue fragmented frames to avoid interleaving.
[8sync.git] / 8sync / systems / websocket / client.scm
1 ;;; guile-websocket --- WebSocket client/server
2 ;;; Copyright © 2016 David Thompson <davet@gnu.org>
3 ;;; Copyright © 2017 Christopher Allan Webber <cwebber@dustycloud.org>
4 ;;; Copyright © 2019, 2020 Jan (janneke) Nieuwenhuizen <janneke@gnu.org>
5 ;;; Copyright © 2020 Rutger van Beusekom <rutger.van.beusekom@gmail.com>
6 ;;;
7 ;;; This file is part of guile-websocket.
8 ;;;
9 ;;; Guile-websocket is free software; you can redistribute it and/or modify
10 ;;; it under the terms of the GNU Lesser General Public License as
11 ;;; published by the Free Software Foundation; either version 3 of the
12 ;;; License, or (at your option) any later version.
13 ;;;
14 ;;; Guile-websocket is distributed in the hope that it will be useful,
15 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
16 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 ;;; Lesser General Public License for more details.
18 ;;;
19 ;;; You should have received a copy of the GNU Lesser General Public
20 ;;; License along with guile-websocket.  If not, see
21 ;;; <http://www.gnu.org/licenses/>.
22
23 ;;; Commentary:
24 ;;
25 ;; WebSocket client.
26 ;;
27 ;;; Code:
28
29 (define-module (8sync systems websocket client)
30   #:use-module (ice-9 match)
31   #:use-module (srfi srfi-26)
32   #:use-module (rnrs bytevectors)
33   #:use-module (rnrs io ports)
34   #:use-module (web request)
35   #:use-module (web response)
36   #:use-module (web uri)
37   #:use-module (oop goops)
38   #:use-module (8sync)
39   #:use-module (8sync ports)
40   #:use-module (8sync contrib base64)
41   #:use-module (8sync systems websocket frame)
42   #:use-module (8sync systems websocket utils)
43   #:export (<websocket>
44             .on-close
45             .on-error
46             .on-message
47             .on-open
48             .socket
49             .state
50             .url
51
52             websocket-closed?
53             websocket-closing?
54             websocket-connect
55             websocket-connecting?
56             websocket-open?
57
58             websocket-close
59             websocket-loop
60             websocket-send))
61
62 (define no-op (const #f))
63
64 (define-actor <websocket> (<actor>)
65   ((*init* websocket-init)
66    (close websocket-close)
67    (open websocket-open)
68    (send websocket-send))
69
70   (state #:accessor .state #:init-value 'closed #:init-keyword #:state)
71   (socket #:accessor .socket #:init-value #f #:init-keyword #:socket)
72   (url #:getter .url #:init-value #f #:init-keyword #:url)
73   (uri #:accessor .uri #:init-value #f #:init-keyword #:uri)
74   (entropy-port #:accessor .entropy-port #:init-form (open-entropy-port))
75   (frames #:accessor .frames #:init-value '())
76
77   (on-close #:init-keyword #:on-close
78                  #:init-value no-op
79                  #:accessor .on-close)
80   (on-error #:init-keyword #:on-error
81             #:init-value no-op
82             #:accessor .on-error)
83   (on-message #:init-keyword #:on-message
84               #:accessor .on-message)
85   (on-open #:init-keyword #:on-open
86                 #:init-value no-op
87                 #:accessor .on-open))
88
89 (define-method (websocket-close (websocket <websocket>) message)
90   (when (websocket-open? websocket)
91     (false-if-exception (close-port (.socket websocket)))
92     (set! (.state websocket) 'closed)
93     (false-if-exception ((.on-close websocket) websocket))
94     (set! (.socket websocket) #f)))
95
96 (define-method (websocket-open (websocket <websocket>) message uri-or-string)
97   (if (websocket-closed? websocket)
98       (let ((uri (match uri-or-string
99                    ((? uri? uri) uri)
100                    ((? string? str) (string->uri str)))))
101         (if (websocket-uri? uri)
102             (catch 'system-error
103               (lambda _
104                 (set! (.uri websocket) uri)
105                 (let ((sock (make-client-socket uri)))
106                   (set! (.socket websocket) sock)
107                   (handshake websocket)
108                   (websocket-loop websocket message)))
109               (lambda (key . args)
110                 ((.on-error websocket) websocket (format #f "open failed: ~s: ~s" uri-or-string args))))
111             ((.on-error websocket) websocket (format #f "not a websocket uri: ~s" uri-or-string))))
112       ((.on-error websocket) websocket (format #f "cannot open websocket in state: ~s" (.state websocket)))))
113
114 (define (subbytevector bv start end)
115   (if (= (bytevector-length bv) end) bv
116       (let* ((length (- end start))
117              (sub (make-bytevector length)))
118         (bytevector-copy! bv start sub 0 length)
119         sub)))
120
121 (define* (make-fragmented-frames data #:key (fragment-size (expt 2 15)))
122   (let ((length (if (string? data) (string-length data)
123                     (bytevector-length data))))
124     (let loop ((offset 0))
125       (let* ((size (min fragment-size (- length offset)))
126              (end (+ offset size))
127              (final? (= end length))
128              (continuation? (not (zero? offset)))
129              (frame (if (string? data) (make-text-frame (substring data offset end) #:final? final? #:continuation? continuation?)
130                         (make-binary-frame (subbytevector data offset end) #:final? final? #:continuation? continuation?))))
131         (if final? (list frame)
132             (cons frame (loop end)))))))
133
134 (define-method (websocket-direct-send (websocket <websocket>) message data)
135   (catch #t           ; expect: wrong-type-arg (open port), system-error
136     (lambda _
137       (let* ((frames (make-fragmented-frames data)))
138         (let loop ((frames frames) (written '(nothing)))
139           (when (pair? frames)
140             (write-frame (car frames) (.socket websocket))
141             (loop (cdr frames) (cons (car frames) written)))
142           ;;(format (current-error-port) "done\n")
143           )))
144     (lambda (key . args)
145       (let ((message (format #f "~a: ~s" key args)))
146         ((.on-error websocket) websocket (format #f "send failed: ~s ~a\n" websocket message))
147         (websocket-close websocket message)))))
148
149 (define-method (websocket-queue-or-send (websocket <websocket>) message data)
150   (catch #t         ; expect: wrong-type-arg (open port), system-error
151     (lambda _
152       (let* ((frames (make-fragmented-frames data))
153              (frames? (pair? (.frames websocket))))
154         (set! (.frames websocket) (append (.frames websocket) frames))
155         (unless frames?
156           (let loop ()
157             (let ((frames (.frames websocket)))
158               (when (pair? frames)
159                 (write-frame (car frames) (.socket websocket))
160                 (set! (.frames websocket) (cdr (.frames websocket)))
161                 (loop))
162               ;;(unless (pair? frames) (format (current-error-port) "done\n"))
163               )))))
164     (lambda (key . args)
165       (let ((message (format #f "~a: ~s" key args)))
166         ((.on-error websocket) websocket (format #f "send failed: ~s ~a\n" websocket message))
167         (websocket-close websocket message)))))
168
169 (define-method (websocket-send (websocket <websocket>) message data)
170   (websocket-queue-or-send websocket message data)
171   ;;(websocket-direct-send websocket message data)
172   )
173
174 (define-method (websocket-init (websocket <websocket>) message)
175   (and=> (.url websocket) (cut websocket-open websocket message <>)))
176
177 (define-method (websocket-socket-open? (websocket <websocket>))
178   "Return #t if .SOCKET of WEBSOCKET is open."
179   (not (port-closed? (.socket websocket))))
180
181 (define-method (websocket-loop (websocket <websocket>) message)
182
183   (define (handle-data-frame type data)
184     ((.on-message websocket)
185      websocket
186      (match type
187        ('text   (utf8->string data))
188        ('binary data))))
189
190   (define (read-frame-maybe)
191     (and (not (eof-object? (lookahead-u8 (.socket websocket))))
192          (read-frame (.socket websocket))))
193
194   (define (close-down)
195     (websocket-close websocket message))
196
197   ((.on-open websocket) websocket)
198
199   (let loop ((fragments '())
200              (type #f))
201     (catch #t
202       (lambda _
203         (let* ((socket (.socket websocket))
204                (frame (and (websocket-open? websocket)
205                            (read-frame-maybe))))
206           (cond
207            ;; EOF - port is closed.
208            ;; @@: Sometimes the eof object appears here as opposed to
209            ;;   at lookahead, but I'm not sure why
210            ((or (not frame) (eof-object? frame))
211             (close-down))
212            ;; Per section 5.4, control frames may appear interspersed
213            ;; along with a fragmented message.
214            ((close-frame? frame)
215             ;; Per section 5.5.1, echo the close frame back to the
216             ;; socket before closing the socket.  The socket may no
217             ;; longer be listening.
218             (false-if-exception
219              (write-frame (make-close-frame (frame-data frame)) socket))
220             (close-down))
221            ((ping-frame? frame)
222             ;; Per section 5.5.3, a pong frame must include the exact
223             ;; same data as the ping frame.
224             (write-frame (make-pong-frame (frame-data frame)) socket)
225             (loop fragments type))
226            ((pong-frame? frame)         ; silently ignore pongs
227             (loop fragments type))
228            ((first-fragment-frame? frame) ; begin accumulating fragments
229             (loop (list frame) (frame-type frame)))
230            ((final-fragment-frame? frame) ; concatenate all fragments
231             (handle-data-frame type (frame-concatenate
232                                      (reverse (cons frame fragments))))
233             (loop '() #f))
234            ((fragment-frame? frame)     ; add a fragment
235             (loop (cons frame fragments) type))
236            ((data-frame? frame)         ; unfragmented data frame
237             (handle-data-frame (frame-type frame) (frame-data frame))
238             (loop '() #f)))))
239       (lambda (key . args)
240         (let ((message (format #f "~a: ~s" key args)))
241           ((.on-error websocket) websocket (format #f "read failed: ~s\n" websocket))
242           (if (websocket-socket-open? websocket) (loop '() #f)
243               (websocket-close websocket message)))))))
244
245 ;; See Section 3 - WebSocket URIs
246 (define (encrypted-websocket-scheme? uri)
247   "Return #t if the scheme for URI is 'wss', the secure WebSocket
248 scheme."
249   (eq? (uri-scheme uri) 'wss))
250
251 (define (unencrypted-websocket-scheme? uri)
252   "Return #t if the scheme for URI is 'ws', the insecure WebSocket
253 scheme."
254   (eq? (uri-scheme uri) 'ws))
255
256 (define (websocket-uri? uri)
257   "Return #t if URI is a valid WebSocket URI."
258   (and (or (encrypted-websocket-scheme? uri)
259            (unencrypted-websocket-scheme? uri))
260        (not (uri-fragment uri))))
261
262 (define (set-nonblocking! port)
263   (fcntl port F_SETFL (logior O_NONBLOCK (fcntl port F_GETFL)))
264   (setvbuf port 'block 1024))
265
266 (define (make-client-socket uri)
267   "Connect a socket to the remote resource described by URI."
268   (let* ((port (uri-port uri))
269          (info (car (getaddrinfo (uri-host uri)
270                                  (if port
271                                      (number->string port)
272                                      (symbol->string (uri-scheme uri)))
273                                  (if port
274                                      AI_NUMERICSERV
275                                      0))))
276          (sock (with-fluids ((%default-port-encoding #f))
277                  (socket (addrinfo:fam info) SOCK_STREAM IPPROTO_IP))))
278
279     (set-nonblocking! sock)
280     ;; Disable buffering for websockets
281     (setvbuf sock 'none)
282
283     ;; TODO: Configure I/O buffering?
284     (connect sock (addrinfo:addr info))
285     sock))
286
287 (define-method (write (o <websocket>) port)
288    (format port "#<websocket ~a ~a>"
289            (.url o)
290            (.state o)))
291
292 (define-method (websocket-connecting? (websocket <websocket>))
293   "Return #t if WEBSOCKET is in the connecting state."
294   (eq? (.state websocket) 'connecting))
295
296 (define-method (websocket-open? (websocket <websocket>))
297   "Return #t if WEBSOCKET is in the open state."
298   (eq? (.state websocket) 'open))
299
300 (define-method (websocket-closing? (websocket <websocket>))
301   "Return #t if WEBSOCKET is in the closing state."
302   (eq? (.state websocket) 'closing))
303
304 (define-method (websocket-closed? (websocket <websocket>))
305   "Return #t if WEBSOCKET is in the closed state."
306   (eq? (.state websocket) 'closed))
307
308 (define-method (generate-client-key (websocket <websocket>))
309   "Return a random, base64 encoded nonce using the entropy source of
310 WEBSOCKET."
311   (base64-encode
312    (get-bytevector-n (.entropy-port websocket) 16)))
313
314 ;; See Section 4.1 - Client Requirements
315 (define (make-handshake-request uri key)
316   "Create an HTTP request for initiating a WebSocket connection with
317 the remote resource described by URI, using a randomly generated nonce
318 KEY."
319   (let ((headers `((host . (,(uri-host uri) . #f))
320                    (upgrade . ("WebSocket"))
321                    (connection . (upgrade))
322                    (sec-websocket-key . ,key)
323                    (sec-websocket-version . "13"))))
324     (build-request uri #:method 'GET #:headers headers)))
325
326 (define-method (handshake (websocket <websocket>))
327   "Perform the WebSocket handshake for the client WEBSOCKET."
328   (let ((key (generate-client-key websocket)))
329     (write-request (make-handshake-request (.uri websocket) key)
330                    (.socket websocket))
331     (let* ((response (read-response (.socket websocket)))
332            (headers (response-headers response))
333            (upgrade (assoc-ref headers 'upgrade))
334            (connection (assoc-ref headers 'connection))
335            (accept (assoc-ref headers 'sec-websocket-accept)))
336       ;; Validate the handshake.
337       (if (and (= (response-code response) 101)
338                (string-ci=? (car upgrade) "websocket")
339                (equal? connection '(upgrade))
340                (string=? (string-trim-both accept) (make-accept-key key)))
341           (set! (.state websocket) 'open)
342           (begin
343             (websocket-close websocket)
344             ((.on-error websocket) websocket
345              (format #f "websocket handshake failed: ~s"
346                      (uri->string (.uri websocket)))))))))
347
348 (define (open-entropy-port)
349   "Return an open input port to a reliable source of entropy for the
350 current system."
351   (if (file-exists? "/dev/urandom")
352       (open-input-file "/dev/urandom")
353       ;; XXX: This works as a fall back but this isn't exactly a
354       ;; reliable source of entropy.
355       (make-soft-port (vector (const #f) (const #f) (const #f)
356                               (lambda _ (let ((r (random 256))) (integer->char r)))
357                               (const #f)
358                               (const #t)) "r")))
359
360 (define-method (websocket-close (websocket <websocket>))
361   "Close the WebSocket connection for the client WEBSOCKET."
362   (let ((socket (.socket websocket)))
363     (set! (.state websocket) 'closing)
364     (write-frame (make-close-frame (make-bytevector 0)) socket)
365     ;; Per section 5.5.1 , wait for the server to close the connection
366     ;; for a reasonable amount of time.
367     (let loop ()
368       (match (select #() (vector socket) #() 1) ; 1 second timeout
369         ((#() #(socket) #()) ; there is output to read
370          (unless (port-eof? socket)
371            (read-frame socket) ; throw it away
372            (loop)))))
373     (close-port socket)
374     (close-port (.entropy-port websocket))
375     (set! (.state websocket) 'closed)))