I am trying to build RCP server based on aleph
. It passed on all the test, but when the byte array for send or receive become large, the bytes seems been corrupted.
For example. I tried to send a byte array which length is 2936, but I only got 1024 bytes at server
I followed the example and did my own modifications for nippy
. The encoding and decoding will be done by handlers themselves.
(defn wrap-duplex-stream
[s]
(let [out (s/stream)]
(s/connect out s)
(s/splice out s)))
(defn client
[host port]
(d/chain (tcp/client {:host host, :port port})
#(wrap-duplex-stream %)))
(defn start-server
[handler port]
(tcp/start-server
(fn [s info]
(handler (wrap-duplex-stream s) info))
{:port port}))
I finally made my own codec for byte array. It is simple, but to figure how to make use it with gloss and aleph is time consuming
(defn buffer->byte-array [buf-seq]
(byte-streams/to-byte-array buf-seq))
(defn bytes-codec []
(reify
Reader
(read-bytes [this buf-seq]
(let [buf-seq (dup-bytes buf-seq)
byte-arr (buffer->byte-array buf-seq)]
[true byte-arr nil]))
Writer
(sizeof [x]
nil)
(write-bytes [x y byte-arr]
[(ByteBuffer/wrap byte-arr)])))
(def protocol
(gloss/compile-frame
(gloss/finite-frame
:uint32
(bytes-codec))
#(nippy/freeze %)
#(nippy/thaw %)))
(defn wrap-duplex-stream
[s]
(let [out (s/stream)]
(s/connect
(s/map #(io/encode protocol %) out)
s)
(s/splice
out
(io/decode-stream s protocol))))
(defn client
[host port]
(d/chain (tcp/client {:host host, :port port})
#(wrap-duplex-stream %)))
(defn start-server
[handler port]
(tcp/start-server
(fn [s info]
(handler (wrap-duplex-stream s) info))
{:port port}))