clojuretransducer

Aggregating transducers with intermediate values


I am still trying to understand better how to work with transducers in clojure. Here, I am interested in applying aggregating transducers, such as the ones in https://github.com/cgrand/xforms, but reporting at each step the intermediate values of the computation.

For instance, the following expression

(sequence (x/into #{}) [1 2 3])

yields (#{1 2 3}), which is only the final value of the reduction. Now, I would be interested in an transducer xf-incremental that given something like

(sequence (comp xf-incremental (x/into #{})) [1 2 3])

yields (#{1} #{1 2} #{1 2 3}).

The reason why I am interested in this is that I want to report intermediate values of a metric that aggregates over the history of processed values.

Any idea how can I do something of the sort in a generic way?

EDIT: Think of (x/into #{}) as an arbitrary transducer that aggregates results. Better examples could be x/avg or (x/reduce +) where I would expect

(sequence (comp xf-incremental x/avg) [1 2 3])
(sequence (comp xf-incremental (x/reduce +)) [1 2 3])

to return (1 3/2 2) and (1 3 6) respectively.

EDIT 2: another way of phrasing this is that I want a transducer that performs a reducing function and returns the accumulator at each step, which also can reuse all the available transducers so I do not need to rewrite basic functionalities.


Solution

  • Solution using clojure.core/reductions

    You don't need a transducer to perform the computation that you are asking for. The function you are looking for to see all the intermediate results of reduce is called reductions and you provide it with conj and an empty set as arguments:

    (rest (reductions conj #{} [1 2 3]))
    ;; => (#{1} #{1 2} #{1 3 2})
    

    rest removes the first empty set, because that was the output you requested in the original question.

    The function that builds up the result here is conj, lets refer to it as a step function. A transducer is a function that takes a step function as input and returns a new step function as output. So if we want to combine reductions with a transducer, we can just apply the transducer to conj:

    (def my-transducer (comp (filter odd?)
                             (take 4)))
    
    (dedupe (reductions (my-transducer conj) #{} (range)))
    ;; => (#{} #{1} #{1 3} #{1 3 5} #{7 1 3 5})
    

    dedupe is there just to remove elements that are equal to preceding elements. You can remove it if you don't want to do that. In that case you get the following, because that is how the filtering transducer works:

    (reductions (my-transducer conj) #{} (range)))
    ;; => (#{} #{} #{1} #{1} #{1 3} #{1 3} #{1 3 5} #{1 3 5} #{7 1 3 5})
    

    Transducer-based solution using net.cgrand.xforms/reductions

    Apparently, there is also a transducer version of reductions in the xforms library, which is closer to your initial code:

    (require '[net.cgrand.xforms :as xforms])
    
    (rest (sequence (xforms/reductions conj #{}) [1 2 3]))
    ;; => (#{1} #{1 2} #{1 3 2})
    

    This xforms/reductions transducer can be composed with other transducer using comp to for example filter odd numbers and taking the first four of them:

    (sequence (comp (filter odd?)
                    (take 4)
                    (xforms/reductions conj #{}))
    
              (range))
    ;; => (#{} #{1} #{1 3} #{1 3 5} #{7 1 3 5})
    

    In this case, you don't need dedupe. It is also possible to use other step functions with xforms/reductions, e.g. +:

    (sequence (comp (filter odd?)
                    (take 10)
                    (xforms/reductions + 0)
                    (filter #(< 7 %)))
    
              (range))
    ;; => (9 16 25 36 49 64 81 100)