asynchronousgroovydataflowgpars

Waiting on multiple async gpars dataflow tasks with timeout


I'm struggling to achieve a goal of having multiple async tasks that have general timeout. The trick is that I need to process whatever was received within the timeout.

For example the code below gets the value of both tasks when timeout value is more than two seconds. But once the timeout is decreased (or alternatively the tasks are taking longer), only an TimeoutException is thrown and none the task results are received.

def timeout = 3  // value in seconds

def t1 = task {
    Thread.sleep(1000)
    println 't1 done'
    't1'
}

def t2 = task {
    Thread.sleep(2000)
    println 't2 done'
    't2'
}

def results = whenAllBound( [t1, t2] ) { List l ->
    println 'all done ' + l
    l.join(', ')
}.get( timeout, SECONDS )

println "results $results"

Instead of get() using join() does not throw the TimeoutException, but then again it does not return the final results either and continues processing the code after the timeout expires.

Either I don't understand the Dataflow structures fully/enough/correctly, I'm trying to use them incorrectly, or both.

Basically what I need is a sync block that triggers several async jobs with a common timeout, returning whatever responses that there were available when timeout happened. Timeout is more of an exceptional case, but does happen occasionally for each of the tasks and should not effect overall processing.


Solution

  • Perhaps this way could work for you:

    whenAllBound( [t1, t2] ) { List l ->
        println 'all done ' + l
        l.join(', ')
    }.join( timeout, java.util.concurrent.TimeUnit.SECONDS )
    
    def results = [t1, t2].collect {it.bound ? it.get() : null}
    println "results $results"