tcpclojurealeph

TCP transport long bytes with Aleph


I am trying to build RCP server based on aleph. It passed on all the test, but when the byte array for send or receive become large, the bytes seems been corrupted.

For example. I tried to send a byte array which length is 2936, but I only got 1024 bytes at server

I followed the example and did my own modifications for nippy. The encoding and decoding will be done by handlers themselves.

(defn wrap-duplex-stream
  [s]
  (let [out (s/stream)]
    (s/connect out s)
    (s/splice out s)))

(defn client
  [host port]
  (d/chain (tcp/client {:host host, :port port})
           #(wrap-duplex-stream %)))

(defn start-server
  [handler port]
  (tcp/start-server
    (fn [s info]
      (handler (wrap-duplex-stream s) info))
    {:port port}))

Solution

  • I finally made my own codec for byte array. It is simple, but to figure how to make use it with gloss and aleph is time consuming

    (defn buffer->byte-array [buf-seq]
      (byte-streams/to-byte-array buf-seq))
    
    (defn bytes-codec []
      (reify
        Reader
        (read-bytes [this buf-seq]
          (let [buf-seq (dup-bytes buf-seq)
                byte-arr (buffer->byte-array buf-seq)]
            [true byte-arr nil]))
        Writer
        (sizeof [x]
          nil)
        (write-bytes [x y byte-arr]
          [(ByteBuffer/wrap byte-arr)])))
    
    (def protocol
      (gloss/compile-frame
        (gloss/finite-frame
          :uint32
          (bytes-codec))
        #(nippy/freeze %)
        #(nippy/thaw %)))
    
    
    (defn wrap-duplex-stream
      [s]
      (let [out (s/stream)]
        (s/connect
          (s/map #(io/encode protocol %) out)
          s)
        (s/splice
          out
          (io/decode-stream s protocol))))
    
    (defn client
      [host port]
      (d/chain (tcp/client {:host host, :port port})
               #(wrap-duplex-stream %)))
    
    (defn start-server
      [handler port]
      (tcp/start-server
        (fn [s info]
          (handler (wrap-duplex-stream s) info))
        {:port port}))