I wrote some core.async code in Clojure and when I ran it it consumed all available memory and failed with an error. It appears that using mapcat
in a core.async pipeline breaks back pressure. (Which is unfortunate for reasons beyond the scope of this question.)
Here is some code that demonstrates the problem by counting :x
s in and out of a mapcat
ing transducer:
(ns mapcat.core
(:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
(let [message (repeat length :x)
input (async/chan)
transform (async/chan 1 (mapcat seq))
output (async/chan)
sent (atom 0)]
(async/pipe input transform)
(async/pipe transform output)
(async/go
(dotimes [_ n]
(async/>! input message)
(swap! sent inc))
(async/close! input))
(async/go-loop [x 0]
(when (= 0 (mod x (/ (* n length) 10)))
(println "in:" (* @sent length) "out:" x))
(when-let [_ (async/<! output)]
(recur (inc x))))))
=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000
The producer races far ahead of the consumer.
It appears that I'm not the first person to discover this. But the explanation given here doesn't quite seem to cover it. (Although it does provide an adequate workaround.) Conceptually, I would expect the producer to be ahead, but only by the length of the few messages that might be buffered in the channels.
My question is, where are all the other messages? By the fourth line of output 7000 :x
s are unaccounted for.
UPDATE 2020-01-14: The memory leak is now fixed.
There are two possible interpretations of the question "Where is the memory leak?"
Firstly, where is the data held? The answer seems to be in the channel buffer immediately downstream of the expanding transform.
Channels by default use a FixedBuffer
(clojure.core.async.impl.buffers/FixedBuffer) which can tell if it is full but does not object to being overfull.
Secondly, which passage of code causes the buffer to be overfull? This (correct me if I am wrong) appears to be in the take!
method of ManyToManyChannel
(clojure.core.async.impl.channels/ManyToManyChannel) where the first call to add!
on the buffer occurs before any calls to full?
have taken place.
It seems that take!
assumes that it can add at least one item to the buffer for every item it removes. In the case of long running expanding transducers such as mapcat
this is not always a safe assumption.
By changing this line to (when (and (.hasNext iter) (not (impl/full? buf)))
in a local copy of core.async I can make the code in the question behave as expected. (N.B. My understanding of core.async is insufficient for me to guarantee that this is a robust solution for your use case.)
UPDATE 2016-09-17: there is now an issue for this: http://dev.clojure.org/jira/browse/ASYNC-178
UPDATE 2020-01-14: this is now fixed as of: https://clojure.atlassian.net/browse/ASYNC-210 (although the earlier ticket was closed as 'Declined')