httplispcommon-lispserver-sent-events

SSE Server in Common Lisp


I'm trying to write a simple async server in common lisp. Emphasis on simple. Here's Take 2 (thanks to Rainer for advice and formatting):

(ql:quickload (list :cl-ppcre :usocket))
(defpackage :test-server (:use :cl :cl-ppcre :usocket))
(in-package :test-server)

(defvar *socket-handle* nil)
(defparameter *channel* nil)

(defclass buffer ()
  ((contents :accessor contents :initform nil)
   (started :reader started :initform (get-universal-time))
   (state :accessor state :initform :empty)))

(defun listen-on (port &optional (stream *standard-output*))
  (setf *socket-handle* (socket-listen "127.0.0.1" port :reuse-address t))
  (let ((conns (list *socket-handle*))
        (buffers (make-hash-table)))
    (loop (loop for ready in (wait-for-input conns :ready-only t)
                do (if (typep ready 'stream-server-usocket)
                       (push (socket-accept ready) conns)
                     (let ((buf (gethash ready buffers (make-instance 'buffer))))
                       (buffered-read! (socket-stream ready) buf)
                       (when (starts-with? (list #\newline #\return #\newline #\return)
                                           (contents buf))
                         (format stream "COMPLETE ~s~%"
                                 (coerce (reverse (contents buf)) 'string))
                         (setf conns (remove ready conns))
                         (remhash ready buffers)
                         (let ((parsed (parse buf)))
                           (format stream "PARSED: ~s~%" parsed)
                           (handle-request ready (parse buf))))))))))

(defmethod parse ((buf buffer))
  (let ((lines (split "\\r?\\n" (coerce (reverse (contents buf)) 'string))))
    (second (split " " (first lines)))))

HTTP writing:

(defmethod http-write (stream (line-end (eql :crlf)))
  (declare (ignore line-end))
  (write-char #\return stream)
  (write-char #\linefeed stream)
  (values))

(defmethod http-write (stream (line string))
  (write-string line stream)
  (http-write stream :crlf)
  (values))

(defmethod http-write (stream (lst list))
  (mapc (lambda (thing) (http-write stream thing)) lst)
  (values))

How to handle a request:

(defmethod handle-request (socket request)
  (let ((s (socket-stream socket)))
    (cond ((string= "/sub" request)
           (subscribe! socket))
          ((string= "/pub" request)
           (publish! "Got a message!")
           (http-write s (list "HTTP/1.1 200 OK"
                               "Content-Type: text/plain; charset=UTF-8"
                               "Cache-Control: no-cache, no-store, must-revalidate"
                               "Content-Length: 10" :crlf
                               "Published!" :crlf))
           (socket-close socket))
          (t (http-write s (list "HTTP/1.1 200 OK" 
                                 "Content-Type: text/plain; charset=UTF-9" 
                                 "Content-Length: 2" :crlf 
                                 "Ok" :crlf))
             (socket-close socket)))))

Publish!

(defun publish! (msg)
  (loop for sock in *channel*
     do (handler-case
            (let ((s (socket-stream sock)))
              (format s "data: ~a" msg)
              (http-write s (list :crlf :crlf))
              (force-output s))
          (error (e)
             (declare (ignore e))
             (setf *channel* (remove sock *channel*))))))

Subscribe!

(defun subscribe! (sock)
  (let ((s (socket-stream sock)))
    (http-write s (list "HTTP/1.1 200 OK" 
                        "Content-Type: text/event-stream; charset=utf-8"
                        "Transfer-Encoding: chunked"
                        "Connection: keep-alive"
                        "Expires: Thu, 01 Jan 1970 00:00:01 GMT"
                        "Cache-Control: no-cache, no-store, must-revalidate" :crlf))
    (force-output s)
    (push sock *channel*)))

Basic utility:

(defmethod starts-with? ((prefix list) (list list) &optional (test #'eql))
  (loop for (p . rest-p) on prefix for (l . rest-l) on list
     when (or (and rest-p (not rest-l)) (not (funcall test p l))) 
     do (return nil)
     finally (return t)))

(defun stop ()
  (when *socket-handle*
    (loop while (socket-close *socket-handle*))
    (setf *socket-handle* nil
      *channel* nil)))

(defmethod buffered-read! (stream (buffer buffer))
  (loop for char = (read-char-no-hang stream nil :eof)
     until (or (null char) (eql :eof char))
     do (push char (contents buffer))))

The summary is:

  1. It listens on a specified port and dumps request data to the specified stream
  2. If it gets a request to "/sub", it's supposed to keep that socket around for further writes.
  3. If it gets a request to "/pub", it's supposed to send a short message out to all existing subscribers
  4. It sends back a plain-text "Ok" on any other request.

All feedback welcome, as usual. As of version 2 (added HTTP-friendly line-endings and a couple strategically placed force-output calls), browsers seem happier with me, but Chrome still chokes when a message is actually sent to an existing channel. Any idea what the remaining bugs in publish! are?

To be clear, doing

var src = new EventSource("/sub");
src.onerror = function (e) { console.log("ERROR", e); };
src.onopen = function (e) { console.log("OPEN", e); };
src.onmessage = function (e) { console.log("MESSAGE", e) };

Now gets me a working event stream in FireFox (it triggers onopen, and triggers an onmessage per sent update). But fails in Chrome (triggers onopen, with each update triggering onerror instead of onmessage).

Any help is appreciated.


Solution

  • Ok, so after trying a bunch of things, I have it working, but I have no idea why. That's going to be my next question.

    What didn't work:

    The only thing left at this point was busting out the packet sniffer. Using sniffit, I discovered that there was in fact a difference between what the nginx PushStream module was emitting, and what was being emitted by my implementation.

    Mine (yes, I pretended to be nginx/1.2.0 just to absolutely minimize the differences between responses):

    HTTP/1.1 200 OK
    Server: nginx/1.2.0
    Date: Sun, 15 Oct 2013 10:29:38 GMT-5
    Content-Type: text/event-stream; charset=utf-8
    Transfer-Encoding: chunked
    Connection: keep-alive
    Expires: Thu, 01 Jan 1970 00:00:01 GMT
    Cache-Control: no-cache, no-store, must-revalidate
    
    data: message goes here
    

    The nginx Push Stream module:

    HTTP/1.1 200 OK
    Server: nginx/1.2.0
    Date: Sun, 15 Sep 2013 14:40:12 GMT
    Content-Type: text/event-stream; charset=utf-8
    Connection: close
    Expires: Thu, 01 Jan 1970 00:00:01 GMT
    Cache-Control: no-cache, no-store, must-revalidate
    Transfer-Encoding: chunked
    
    6d
    data: message goes here
    

    Adding that "6d" line to my implementation made it work properly. I have no idea why, unless this is some convention for boms in UTF-8 that I'm unfamiliar with. In other words, re-writing subscribe! as

    (defun subscribe! (sock)
      (let ((s (socket-stream sock)))
        (http-write s (list "HTTP/1.1 200 OK" 
                            "Content-Type: text/event-stream; charset=utf-8"
                            "Transfer-Encoding: chunked"
                            "Connection: keep-alive"
                            "Expires: Thu, 01 Jan 1970 00:00:01 GMT"
                            "Cache-Control: no-cache, no-store, must-revalidate" :crlf
                            "6d"))
        (force-output s)
        (push sock *channel*)))
    

    does the trick. Chrom(?:e|ium) now properly accept these event streams, and don't error on message sends.

    Now I need to understand exactly what the hell happened there...