groovygpars

GParsExecutorsPool.withPool is not making Async connections


I am using groovy and Gpars to make async connection. I am getting a huge JSON as request to my API, i am splitting JSON using JSON Path. $.${jsonobjstpath}..[i][j], Where i and j are values in range of 0:20, and looping over it. I am able to get correct splited json. and these JSON batches i am sending to my API using GParsExecutorsPool.withPool. But gpar is waiting for the response. Let say if processing API for one request is taking 10sec, gpar is waiting for 10 sec to send controller to loop. My code is below.

    import groovyx.gpars.GParsExecutorsPool;
    import groovyx.gpars.GParsPool;
    import jsr166y.ForkJoinPool;
    import jsr166y.RecursiveTask;

    def  invoke(mega) {   
        def size=mega.get("size"); //get size of actual JSON objects
        def body=mega.get("content.body"); // Load JSON body
        int counter=Math.ceil(size/20); //Get Number of loops to run
        def Path="/AsyncInmp"; // Path to call function 
        def Name="SplitJsonObject"; //Name of Function
        int i=0;
        int j=19;
        while(j<=size) {
            msg.put("i",i); //Send i value to function
            msg.put("j",j); // Send j value to function
            callPolicy(Path,Name,body); //Call function json path to get split json, receiving JSON with i and j values
            def split_body=resp.body;//response from split json
            def Path2="/AsyncInmp"; //path to connection function
            def Name2="connect"; //name of connection function
            GParsExecutorsPool.withPool {
              (0..<1).eachParallel { k -> 
                callPolicy(Path2, Name2,split_body) //Call function to connect using gpars, This is not working
              }
            }
            j=j+20;
            i=i+20;
        }
        return true;
    }
  1. So how can i make async call using gpar as soon as my split json request is ready
  2. how can i collect response from all async call

Solution

  • You are calling withPool inside of your while loop and also using a range of size 1 in your eachParallel, I would guess these things combined essentially makes your code behave in a single threaded manner.

    Changing it to something like this:

    import java.util.concurrent.CopyOnWriteArrayList
    
    def futures = [] as CopyOnWriteArrayList
    GParsExecutorsPool.withPool {
      while(...) {
        ...
        futures << {
          callPolicy(Path2, Name2,split_body)
        }.async().call()
      }
    }
    
    // wait for all requests to complete
    def results = futures*.get() // or futures.collect { it.get() } if this breaks
    
    // results is now a list of return values from callPolicy
    

    I have not tested or run this code, but it should give you an idea of how you could move forward.

    <-- edit after comments -->

    A working example:

    @Grab('org.codehaus.gpars:gpars:1.0.0')
    
    import groovyx.gpars.*
    import java.util.concurrent.*
    import java.util.concurrent.atomic.*
    import static groovyx.gpars.dataflow.Dataflow.task
    
    random    = new Random()
    sequence  = new AtomicInteger(-1)
    
    def promises = [] as CopyOnWriteArrayList
    
    GParsPool.withPool(25) { pool -> 
      10.times { index ->
        promises << task { 
          callPolicy(index)
        }
      }
    }
    
    def results = promises*.get()
    
    results.each { map -> 
      println map
    }
    
    def callPolicy(index) {
      Thread.sleep(random.nextInt(100) % 100)
      [index: index, sequence: sequence.incrementAndGet(), time: System.currentTimeMillis()]
    }
    

    which produces the following type of output:

    ~> groovy solution.groovy
    [index:0, sequence:9, time:1558893973348]
    [index:1, sequence:1, time:1558893973305]
    [index:2, sequence:8, time:1558893973337]
    [index:3, sequence:5, time:1558893973322]
    [index:4, sequence:7, time:1558893973337]
    [index:5, sequence:4, time:1558893973320]
    [index:6, sequence:3, time:1558893973308]
    [index:7, sequence:6, time:1558893973332]
    [index:8, sequence:0, time:1558893973282]
    [index:9, sequence:2, time:1558893973308]
    
    ~>
    

    where we can see that the results are returned and also that the calls are made in a multi-threaded manner since the sequence and the index values are not both in sequence.