I have some problems while using ThreadPoolExecutor
and I cannot figure it out.
I try to use two List<CompletableFuture<Void>> taskList
lists to store tasks and then submit to a threadpool but all threads are blocked and can not handle new requests. The code like this:
There is a threadpool.
public class AsyncInvoker {
public class AsyncInvoker {
private ExecutorService excutePool;
public AsyncInvoker(int thread, int queue) {
excutePool= new ThreadPoolExecutor(thread, thread,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queue), new ThreadPoolExecutor.AbortPolicy());
}
public InvokeCollection newCollection() {
return new InvokeCollection();
}
public class InvokeCollection {
private List<CompletableFuture<Void>> taskList = new ArrayList<>();
public void add(Runnable task) {
CompletableFuture<Void> future = CompletableFuture.runAsync(task, excutePool);
taskList.add(future);
}
public void invokeAll() {
try {
CompletableFuture<Void> future = CompletableFuture.allOf(taskList.toArray(new CompletableFuture[0]));
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
And I submit jobs like this:
@RestController
public class TestController {
AsyncInvoker asyncInvoker = new AsyncInvoker(10,1000000);
@GetMapping("test")
public void test(){
System.out.println("request=======================");
AsyncInvoker.InvokeCollection outer = asyncInvoker.newCollection();
for (int i = 0; i < 10; i++) {
outer.add(()->{
AsyncInvoker.InvokeCollection inner = asyncInvoker.newCollection();
for (int j = 0; j < 100; j++) {
inner.add(()->{
try {
Thread.sleep(100);
System.out.println("===");
}catch (Exception e) {
e.printStackTrace();
}
});
}
inner.invokeAll();
});
System.out.println("i " + i);
}
outer.invokeAll();
}
}
When I request the TestController
multiple times, there is several ===
print in idea's console, and as I request repeatly, there is no more ===
prints. I think the thread is blocked. The thread dump like this:
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076c9923a0> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3334)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at com.aa.AsyncInvoker$InvokeCollection.invokeAll(AsyncInvoker.java:59)
And then, if there is just one AsyncInvoker.InvokeCollection
instead of outer
and inner
,and I add all tasks into one collection, it works well.
and also, if I use private ForkJoinPool forkJoinPool = new ForkJoinPool(thread,new SafeForkJoinWorkerThreadFactory(), null, false);
to handle tasks, it also looks good.
So I don't know why the code write above doesn't work, is the thread blocked,why? and why ForkJoinPool
is better than ThreadPoolExecutor
in this case.
Can someone give me some help? Thanks a lot!!
The problem is that there are at maximum 10 threads available to the thread pool executor in your asyncInvoker
instance. Unless a thread pool executor thread completes its task it is unavailable for any other task execution. In order to complete a task of outer
you need 1 thread for executing that task and 100 threads to complete all the tasks of inner
. In order to complete a task for inner
all of the 100 tasks in that instance must have completed. But the 10th task of inner
won't get executed anymore because all the threads in the executor already are working on a task.
The behaviour differs when using ForkJoinPool
because this uses work stealing. Instead of waiting the thread executing future.get();
will search for other work to do unless that future
completed.