javamultithreadingretrofit2executorservicecompletable-future

Can I submit to the same Single Thread Executor multiple times?


I have a block of code that should be run in parallel on different threads using CompletableFuture and ExecutorService. The block of code makes network requests (Using RetroFit), and waits for each network request to finish before going on to make the next request, etc. Can I call SingleThreadExecutor#submit() multiple times, passing around that executor?

I want to ensure all parallel runs of this block of code get their own thread and run all their operations on that thread.

Basically the block of code should act like synchronous code, but have multiple instance of this block running in parallel, each instance on it's own thread.

Currently, the program just idles when we make a call from the Service and I suspect it has to do with passing around the same SingleThreadExecutor but am a bit stuck.

ArrayList<CompletableFuture<Void>> futures = new ArrayList<>();

for (int i = 0; i < SIZE; i++) {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        Future<List<String>> stringListFuture = myStringService.getStringList(executor);
        // We begin stalling once the next line gets hit.
        List<String> stringList = stringListFuture.get();
        
        Future<Integer> integerDataFuture = myIntegerService.getInteger(executor);
        Integer integerData = integerDataFuture.get();

        executor.shutdown();
    }, executor);

    futures.add(future);
}

CompletableFuture<Void> completableFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        completableFutures.join();

// Do something after all blocks of code have finished running in parallel.
class MyStringFuture {
    public Future<List<String>> getStringList(ExecutorService executor) {

        // Could these nested uses of the same executor be the problem?
        Future<Double> doubeDataFuture = doubleDataService.getFuture();
        Double doubleData = doubeDataFuture.get(executor);

        Call<List<Dividend>> data = retrofit.create(Api.class).getData(doubleData);

        return executor.submit(() -> data.execute().body());
    }
}

Solution

  • Your code is using a single thread executor task which submits another task to same single thread executor, and then awaits that sub-task to exit. It is the same as this example which would print "ONE" and "THREE", and never print "TWO", "cf" or "FOUR":

    ExecutorService executor = Executors.newSingleThreadExecutor();
    
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        log("ONE");
        // This subtask can never start while current task is still running:
        Future<?> cf = executor.submit(() -> log("TWO"));
        log("THREE");
        try
        {
            // Blocks forever if run from single thread executor:
            log("cf"+cf.get());
        }
        catch (Exception e)
        {
            throw new RuntimeException("It failed");
        }
        log("FOUR");
    }, executor);
    

    The subtask could only run after the main task exits - if "FOUR" was printed - but is stuck awaiting cf.get().

    The solution is easy - you should process the initial task on separate Thread or executor queue to the service used by the sub-tasks, or chain each component of the subtasks with .thenRun(...).

    CompletableFuture<Void> future = CompletableFuture.runAsync(subtask1, executor)
                                                      .thenRun(subtask2);