I am using groovy and Gpars to make async connection. I am getting a huge JSON as request to my API, i am splitting JSON using JSON Path. $.${jsonobjstpath}..[i][j], Where i and j are values in range of 0:20, and looping over it. I am able to get correct splited json. and these JSON batches i am sending to my API using GParsExecutorsPool.withPool. But gpar is waiting for the response. Let say if processing API for one request is taking 10sec, gpar is waiting for 10 sec to send controller to loop. My code is below.
import groovyx.gpars.GParsExecutorsPool;
import groovyx.gpars.GParsPool;
import jsr166y.ForkJoinPool;
import jsr166y.RecursiveTask;
def invoke(mega) {
def size=mega.get("size"); //get size of actual JSON objects
def body=mega.get("content.body"); // Load JSON body
int counter=Math.ceil(size/20); //Get Number of loops to run
def Path="/AsyncInmp"; // Path to call function
def Name="SplitJsonObject"; //Name of Function
int i=0;
int j=19;
while(j<=size) {
msg.put("i",i); //Send i value to function
msg.put("j",j); // Send j value to function
callPolicy(Path,Name,body); //Call function json path to get split json, receiving JSON with i and j values
def split_body=resp.body;//response from split json
def Path2="/AsyncInmp"; //path to connection function
def Name2="connect"; //name of connection function
GParsExecutorsPool.withPool {
(0..<1).eachParallel { k ->
callPolicy(Path2, Name2,split_body) //Call function to connect using gpars, This is not working
}
}
j=j+20;
i=i+20;
}
return true;
}
- So how can i make async call using gpar as soon as my split json request is ready
- how can i collect response from all async call
You are calling withPool
inside of your while
loop and also using a range of size 1 in your eachParallel
, I would guess these things combined essentially makes your code behave in a single threaded manner.
Changing it to something like this:
import java.util.concurrent.CopyOnWriteArrayList
def futures = [] as CopyOnWriteArrayList
GParsExecutorsPool.withPool {
while(...) {
...
futures << {
callPolicy(Path2, Name2,split_body)
}.async().call()
}
}
// wait for all requests to complete
def results = futures*.get() // or futures.collect { it.get() } if this breaks
// results is now a list of return values from callPolicy
I have not tested or run this code, but it should give you an idea of how you could move forward.
<-- edit after comments -->
A working example:
@Grab('org.codehaus.gpars:gpars:1.0.0')
import groovyx.gpars.*
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import static groovyx.gpars.dataflow.Dataflow.task
random = new Random()
sequence = new AtomicInteger(-1)
def promises = [] as CopyOnWriteArrayList
GParsPool.withPool(25) { pool ->
10.times { index ->
promises << task {
callPolicy(index)
}
}
}
def results = promises*.get()
results.each { map ->
println map
}
def callPolicy(index) {
Thread.sleep(random.nextInt(100) % 100)
[index: index, sequence: sequence.incrementAndGet(), time: System.currentTimeMillis()]
}
which produces the following type of output:
~> groovy solution.groovy
[index:0, sequence:9, time:1558893973348]
[index:1, sequence:1, time:1558893973305]
[index:2, sequence:8, time:1558893973337]
[index:3, sequence:5, time:1558893973322]
[index:4, sequence:7, time:1558893973337]
[index:5, sequence:4, time:1558893973320]
[index:6, sequence:3, time:1558893973308]
[index:7, sequence:6, time:1558893973332]
[index:8, sequence:0, time:1558893973282]
[index:9, sequence:2, time:1558893973308]
~>
where we can see that the results are returned and also that the calls are made in a multi-threaded manner since the sequence
and the index
values are not both in sequence.