I'm using core.async on ClojureScript to avoid using node.js callbacks. The problem is that I'm hitting the 1024 pending messages limit.
To avoid it, I would need to send all messages to channels inside the same go
block. But this is not really possible on core.async because an anonymous function nullifies the effect of go
, so I can't do this:
(go
(. socket on "data" #(>! chan %)))
So, is there a way to get around this limitation?
To simulate the error, we can "simulate" a kind of callback:
(let [callback (atom nil)
on-data (fn [fun]
(reset! callback fun))
chan (async/chan)]
If we try to add a callback with (on-data #(async/go (async/put! chan %)))
, it'll blow the limit. Also, as we're using async/go
, it'll cause the messages inside the channel to be out of order.
The only way I found out to fix this is to create an infinite list of promise-chan
inside an atom
and every callback will pick up the first element, publish a message, and remove the first for the list. Then, we can have a doseq
inside a go
block that'll publish messages for us:
(let [inf-list (atom (map (fn [_] (async/promise-chan)) (range)))]
; We need to iterate over inf-list before anything
(let [lst @inf-list]
(async/go
(doseq [c lst]
(async/>! chan (async/<! c)))))
(on-data #(do
(async/put! (first @inf-list) %)
(swap! inf-list rest))))