node.jscallbackclojurescriptcore.async

How to use core.async in place of callbacks?


I'm using core.async on ClojureScript to avoid using node.js callbacks. The problem is that I'm hitting the 1024 pending messages limit.

To avoid it, I would need to send all messages to channels inside the same go block. But this is not really possible on core.async because an anonymous function nullifies the effect of go, so I can't do this:

(go
  (. socket on "data" #(>! chan %)))

So, is there a way to get around this limitation?


Solution

  • To simulate the error, we can "simulate" a kind of callback:

    (let [callback (atom nil)
          on-data (fn [fun]
                    (reset! callback fun))
          chan (async/chan)]
    

    If we try to add a callback with (on-data #(async/go (async/put! chan %))), it'll blow the limit. Also, as we're using async/go, it'll cause the messages inside the channel to be out of order.

    The only way I found out to fix this is to create an infinite list of promise-chan inside an atom and every callback will pick up the first element, publish a message, and remove the first for the list. Then, we can have a doseq inside a go block that'll publish messages for us:

    (let [inf-list (atom (map (fn [_] (async/promise-chan)) (range)))]
      ; We need to iterate over inf-list before anything
      (let [lst @inf-list]
        (async/go
         (doseq [c lst]
           (async/>! chan (async/<! c)))))
    
      (on-data #(do
                  (async/put! (first @inf-list) %)
                  (swap! inf-list rest))))