javajava-8executorservicecompletable-futurecallable

Need help for CompletableFuture or Callable Future


I am facing issue for finding out which will be the best way for my below scenario. I wanted to call 1 API by using multiple threads at a time and wanted to merge their response to make a list.

First i have created the threadPool and according to the size of the list m passing each record to the callable which is returning the API call response.

    ExecutorService executorService = Executors.newFixedThreadPool(savedAssetparamList.size());
                List<Callable<DatapointLinkDTO>> allCalls = new ArrayList<>(savedAssetparamList.size());
                String finalToken = token;
                savedAssetparamList.stream().forEach(assetParam -> {
                    allCalls.add(Executors.callable(datapointLinkingHelper.postDataPoint(assetParam, mdspSubssationAssetId, finalToken, tenantId)));
                });
                executorService.invokeAll(allCalls);
    
   

     API call method.

    public DatapointLinkDTO postDataPoint(AssetParameter assetParameter, String mdspSubssationAssetId , String token, Long tenantId){
        

    DatapointLinkDTO datapointLinkDTO = new DatapointLinkDTO();

    //API call logic 

    return DatapointLinkDTO;

}

Solution

  • CompletableFuture already "wraps" an asynchronous computation that executes in another thread that you can await to get the result once it is ready. If you want to schedule the execution on a different executor than the default ForkJoinPool you can use

    Executor executor = ...
    
    CompletableFuture.supplyAsync(() -> { /* Your code to call the api */}, executor);
    

    To await for multiple completable future there is CompletableFuture.allOf() which accepts as many CompletableFutures as you want and awaits them. Unfortunately this function ignores the return values so you have to get them individually. A pseudo-working solution could be something like this:

    Executor executor = Executors.newFixedThreadPool(10)
    
    List<CompletableFuture<Response>> results = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        results.add(CompletableFuture.supplyAsync(() -> { /* API Call */}, executor));
    }
    
    CompletableFuture.allOf(results.toArray());
    
    results.stream().map(CompletableFuture::join).collect(toList());
    

    PS. When you're dealing with I/O and not doing much computation it is better to use an unbounded (or bounded by a high number like 50 or more) cachedThreadPool executor that uses more threads but reuses the ones that are finished to better use resources