groovygpars

Collecting a GPars loop to a Map


I need to iterate on a List and for every item run a time-expensive operation and then collect its results to a map, something like this:

List<String> strings = ['foo', 'bar', 'baz']
Map<String, Object> result = strings.collectEntries { key ->
    [key, expensiveOperation(key)]
}

So that then my result is something like

[foo: <an object>, bar: <another object>, baz: <another object>]

Since the operations i need to do are pretty long and don't depend on each other, I've been willing to investigate using GPars to run the loop in parallel.

However, GPars has a collectParallel method that loops through a collection in parallel and collects to a List but not a collectEntriesParallel that collects to a Map: what's the correct way to do this with GPars?


Solution

  • There is no collectEntriesParallel because it would have to produce the same result as:

    collectParallel {}.collectEntries {}
    

    as Tim mentioned in the comment. It's hard to make reducing list of values to map (or any other mutable container) in a deterministic way other than collecting results to a list in parallel and in the end collecting to map entries in a sequential manner. Consider following sequential example:

    static def expensiveOperation(String key) {
        Thread.sleep(1000)
        return key.reverse()
    }
    
    List<String> strings = ['foo', 'bar', 'baz']
    
    
    GParsPool.withPool {
        def result = strings.inject([:]) { seed, key ->
            println "[${Thread.currentThread().name}] (${System.currentTimeMillis()}) seed = ${seed}, key = ${key}"
            seed + [(key): expensiveOperation(key.toString())]
        }
    
        println result
    }
    

    In this example we are using Collection.inject(initialValue, closure) which is an equivalent of good old "fold left" operation - it starts with initial value [:] and iterates over all values and adds them as key and value to initial map. Sequential execution in this case takes approximately 3 seconds (each expensiveOperation() sleeps for 1 second).

    Console output:

    [main] (1519925046610) seed = [:], key = foo
    [main] (1519925047773) seed = [foo:oof], key = bar
    [main] (1519925048774) seed = [foo:oof, bar:rab], key = baz
    [foo:oof, bar:rab, baz:zab]
    

    And this is basically what collectEntries() does - it's kind of reduction operation where initial value is an empty map.

    Now let's see what happens if we try to parallelize it - instead of inject we will use injectParallel method:

    GParsPool.withPool {
        def result = strings.injectParallel([:]) { seed, key ->
            println "[${Thread.currentThread().name}] (${System.currentTimeMillis()}) seed = ${seed}, key = ${key}"
            seed + [(key): expensiveOperation(key.toString())]
        }
    
        println result
    }
    

    Let's see what is the result:

    [ForkJoinPool-1-worker-1] (1519925323803) seed = foo, key = bar
    [ForkJoinPool-1-worker-2] (1519925323811) seed = baz, key = [:]
    [ForkJoinPool-1-worker-1] (1519925324822) seed = foo[bar:rab], key = baz[[:]:]:[]
    foo[bar:rab][baz[[:]:]:[]:][:]:]:[[zab]
    

    As you can see parallel version of inject does not care about the order (which is expected) and e.g. first thread received foo as a seed variable and bar as a key. This is what could happen if reduction to a map (or any mutable object) was performed in parallel and without specific order.

    Solution

    There are two ways to parallelize the process:

    1. collectParallel + collectEntries combination

    As Tim Yates mentioned in the comment you can parallel expensive operation execution and in the end collect results to a map sequentially:

    static def expensiveOperation(String key) {
        Thread.sleep(1000)
        return key.reverse()
    }
    
    List<String> strings = ['foo', 'bar', 'baz']
    
    GParsPool.withPool {
        def result = strings.collectParallel { [it, expensiveOperation(it)] }.collectEntries { [(it[0]): it[1]] }
    
        println result
    }
    

    This example executes in approximately 1 second and produces following output:

    [foo:oof, bar:rab, baz:zab]
    

    2. Java's parallel stream

    Alternatively you can use Java's parallel stream with Collectors.toMap() reducer function:

    static def expensiveOperation(String key) {
        Thread.sleep(1000)
        return key.reverse()
    }
    
    List<String> strings = ['foo', 'bar', 'baz']
    
    def result = strings.parallelStream()
            .collect(Collectors.toMap(Function.identity(), { str -> expensiveOperation(str)}))
    
    println result 
    

    This example also executes in approximately 1 second and produces output like that:

    [bar:rab, foo:oof, baz:zab]
    

    Hope it helps.