javaconcurrencycompletable-futurecompletion-stage

Task scheduling using acceptEither synchronously on CompletableFuture


I am learning concurrency through the CompletableFuture API. Let's say I have two tasks: one takes 250ms and another takes 2500ms. In the following code:

        Supplier<List<Long>> supplyIds = () -> {
            sleep(200);
            return(Arrays.asList(1L, 2L, 3L));
        };

        Function<List<Long>, CompletableFuture<List<User>>> fetchUsers1 = idList -> {
            sleep(250);
            System.out.println("User2"+ Thread.currentThread().getName());
            Supplier<List<User>> userSupplier = () ->  idList.stream().map(User::new).collect(Collectors.toList());
            return(CompletableFuture.supplyAsync(userSupplier));
        };
        Function<List<Long>, CompletableFuture<List<User>>> fetchUsers2 = idList -> {
            sleep(2500);
            System.out.println("User2"+ Thread.currentThread().getName());
            Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
            return(CompletableFuture.supplyAsync(userSupplier));
        };
        Consumer<List<User>> displayer = users -> {
            users.forEach(System.out::println);
        };

        CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIds);
        CompletableFuture<List<User>> users1 = completableFuture.thenCompose(fetchUsers1);
        CompletableFuture<List<User>> users2 = completableFuture.thenCompose(fetchUsers2);

        users1.thenRun(()-> System.out.println("User 1"));
        users2.thenRun(()-> System.out.println("User 2"));
        users1.acceptEither(users2, displayer);

        sleep(6000);

I get the following result:

User2ForkJoinPool.commonPool-worker-1
User 2
1
2
3
User2ForkJoinPool.commonPool-worker-1
User 1

I understand that the code is running synchronously since the same common fork join pool thread is being used and we don't specify the thread. I am confused as to why the fetchUsers2task is being executed first and then fetchUsers1task (this seems to be consistent with each run). I assumed that since thenCompose is called on fetchUsers1 first in the code, it would be 'queued up' first.


Solution

  • Nothing in the documentation says that the order of invocation matters for thenCompose.

    Since you define two independent stages, both only depending on completableFuture, there is no defined order between users1 and user2 and the resulting order is just implementation dependent.

    You may reproducibly get a particular order in one environment, but a different order in a different environment. Even in your environment, it’s possible to get a different order in some runs. If the initiating thread loses the CPU after calling supplyAsync(supplyIds) for 200 milliseconds, it might execute the action specified with thenCompose(fetchUsers1) immediately at the invocation, before the invocation of thenCompose(fetchUsers2).

    When the order between two actions matters, you must model a dependency between them.

    Note that likewise the code

    users1.thenRun(()-> System.out.println("User 1"));
    users2.thenRun(()-> System.out.println("User 2"));
    users1.acceptEither(users2, displayer);
    

    defines entirely independent actions. Since acceptEither is applied to users1 and users2, rather than the completion stages returned by the thenRun calls, it is not dependent on the completion of the print statements. These three actions may be performed in any order.