javamultithreadingjava-stream

Java stream start next map before finishing previous one


I have 2 executors:

ExecutorService executorDownload = Executors.newFixedThreadPool(n);
ExecutorService executorCalculate = Executors.newFixedThreadPool(m);

First I need to put tasks in executorDownload, then after they complite put them in executorCalculate and then get result. I wrote the next stream:

long count = IntStream.range(0, TASK_NUMBER)
            .boxed()
            .parallel()
            .map(i -> executorDownload.submit(new Download(i)))
            .map(future -> calculateResultFuture(executorCalculate, future))
            .filter(Objects::nonNull)
            .filter(Main::isFutureCalculated)
            .count();

public static Future<CalculateResult> calculateResultFuture(ExecutorService executorCalculate, Future<DownloadResult> future) {
    try {
        return executorCalculate.submit(new Calculate(future.get()));
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    return null;
}

public static boolean isFutureCalculated(Future<CalculateResult> future) {
    try {
        return future.get().found;
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    return false;
}

Is it somehow possible to start

.map(future -> calculateResultFuture(executorCalculate, future))

before

.map(i -> executorDownload.submit(new Download(i)))

is ended. I need second map start just after first map started.


Solution

  • You just need to understand that streams process elements one by one, meaning that one element traverses the whole pipeline before the next one enters the first intermediate step (rather than all elements passing together through each step, except of course for intermediate steps that require all elements in order to do their work).

    This means, in your case, that the first element's .get() will block, preventing the second one from entering the the first task.

    To force all elements to go through the first submit (will apply to the second submit too), you need to force the stream to submit all tasks before you start blocking, something like:

    List<Future<DownloadResult>> downloadTasks = IntStream.range(0, TASK_NUMBER)
            .mapToObj(i -> executorDownload.submit(new Download(i)))
            .collect(Collectors.toList());
            //removed .parallel()
    

    This will force all async tasks to start, after which you can do the same with the second async batch:

    List<Future<CalculateResult>> calculateResults = downloadTasks.stream()
            .map(future -> calculateResultFuture(executorCalculate, future))
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
    

    That too, will force all tasks to be submitted to the second executor. From here, you can .get() without making unnecessary waits:

    long count = calculateResults.stream()
            .filter(Main::isFutureCalculated)
            .count();
    

    Now, while this is going to eliminate unnecessary waits between elements within batches, it still has potential for unnecessary waits between batches (if first element finishes first task, it will wait for all the rest to finish first batch before continuing with second batch). To address that, you may need a different implementation. This is a chain of completable futures designed for that:

    List<Completable<CalculateResult>> calculateResult = IntStream.range(0, TASK_NUMBER)
         .mapToObj(i -> CompletableFuture.supplyAsync(() -> callDownload(i), executorDownload)
                 .thenApplyAsync(downloadResult -> calculateResultFuture(downloadResult), executorCalculate))
         .collect(Collectors.toList());
    
    long count = calculateResult.stream().map(f -> isFutureCalculated(f)).count();
    

    thenApplyAsync will make the second task take over when the first one finishes, element-wise.

    Of course this is going to require that you change your API slightly, so that the download methods is called directly. I used callDownload(i) to run the same logic run by new Download(i).call(). calculateResultFuture() would also be changed to remove the Future parameter.