Having this Threadpool:
package xxx;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A thread pool that hold an amount of work. During execution the amount of
* work may grow.
*
* @param <A> The work
*/
public class DepleatingFiFoThreadPool<A> {
public static final Logger LOG = Logger.getLogger(DepleatingFiFoThreadPool.class.getCanonicalName());
public record Socket<A>(A r, String threadPostfix) {
}
private final Thread[] threadsRunning;
private final ArrayList<Socket<A>> unstartedRunnables = new ArrayList<>();
private int nextFreeSocketIndex = 0;
private final Consumer<Throwable> errorHandler;
private final Object lock = new Object();
private boolean waitingForLock = false;
private final String prefix;
private final Consumer<A> invoker;
/**
* Create the threadpool.
*
* @param threadsRunningMax The maximum of Threads running at the same time,
* must be higher than <code>0</code>
* @param errorHandler The handler in case of problems, never
* <code>null</code>
* @param prefix The prefix for the pool, required for debugging
* purposes, never <code>null</code>
* @param invoker The invoker who encapsule the work, never
* <code>null</code>
*/
public DepleatingFiFoThreadPool(final int threadsRunningMax, final Consumer<Throwable> errorHandler,
final String prefix, final Consumer<A> invoker) {
this.errorHandler = errorHandler;
this.prefix = prefix;
this.invoker = invoker;
this.threadsRunning = new Thread[threadsRunningMax];
}
/**
* Add and start a thread.
*
* Work after returning from {@link #executeUntilDeplated(long)} will not be
* executed.
*
* @param notRunningThread The not running work that is designated to be done
* @param threadPostfix The postfix of the thread, never <code>null</code>
*/
public void addAndStartThread(final A notRunningThread, final String threadPostfix) {
var mustCallLater = false;
if (nextFreeSocketIndex < threadsRunning.length) {
synchronized (this) {
if (nextFreeSocketIndex < threadsRunning.length) {
start(notRunningThread, threadPostfix);
} else {
mustCallLater = true;
}
}
} else {
mustCallLater = true;
}
if (mustCallLater) {
unstartedRunnables.add(new Socket<A>(notRunningThread, threadPostfix));
}
}
private void finished() {
synchronized (this) {
threadsRunning[--nextFreeSocketIndex] = null;
if (nextFreeSocketIndex == 0) {
synchronized (lock) {
if (waitingForLock) {
lock.notify();
}
}
} else {
Socket<A> e = null;
if (!unstartedRunnables.isEmpty()) {
synchronized (unstartedRunnables) {
if (!unstartedRunnables.isEmpty()) {
e = unstartedRunnables.removeFirst();
}
}
}
if (e != null) {
start(e.r, e.threadPostfix);
}
}
}
}
private void start(final A notRunningThread, final String name) {
var socket = nextFreeSocketIndex++;
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
invoker.accept(notRunningThread);
} catch (Throwable e) { // fault barrier!
try {
errorHandler.accept(e);
} catch (Throwable ta) {
ta.addSuppressed(e);
LOG.log(Level.SEVERE, ta.getMessage(), ta);
}
}
finished();
}
}, prefix + name);
threadsRunning[socket] = t;
t.start();
}
/**
* Execute the threads and every subsequent thread that is added during the
* progress.
*
* @param timeoutMs The timeout in miliseconds, should be an positive value
* @return <code>false</code> if the timeout has reached, <code>true</code>
* otherwise
* @throws InterruptedException If the thread has been interrupted (maybe
* shutdown of application)
*/
public boolean executeUntilDeplated(final long timeoutMs) throws InterruptedException {
AtomicBoolean resultHolder = new AtomicBoolean(true);
if (nextFreeSocketIndex > 0) {
Thread timeout = new Thread(() -> {
try {
Thread.sleep(timeoutMs);
resultHolder.set(false);
synchronized (lock) {
lock.notify();
}
} catch (InterruptedException e) {
LOG.log(Level.FINE, "Not a bug, timeout not reached, watchdog not required.", e);
}
});
timeout.start();
if (nextFreeSocketIndex > 0) {
waitingForLock = true;
synchronized (lock) {
try {
lock.wait(timeoutMs);
} finally {
timeout.interrupt();
}
}
}
}
return resultHolder.get();
}
}
A exception is thrown sometimes having this Stacktrace:
Exception in thread "NN/SQL-caller" java.lang.ArrayIndexOutOfBoundsException: arraycopy: last source index 50 out of bounds for object array[49]
at java.base/java.lang.System.arraycopy(Native Method)
at java.base/java.util.ArrayList.fastRemove(ArrayList.java:724)
at java.base/java.util.ArrayList.removeFirst(ArrayList.java:573)
at xxx.DepleatingFiFoThreadPool.finished(DepleatingFiFoThreadPool.java:74)
at xxx.DepleatingFiFoThreadPool$1.run(DepleatingFiFoThreadPool.java:100)
at java.base/java.lang.Thread.run(Thread.java:1570)
I am not able to debug this because it appears only once every three to five weeks.
I use this test but I could not find the problem:
public void testMassiveQueuedFunction() throws InterruptedException {
var dtp = new DepleatingFiFoThreadPool<Runnable>(100, x -> System.out.println(x), "testQueueFunction",
Runnable::run);
var l = new CountDownLatch(0);
Runnable wait = () -> {
try {
Thread.sleep(2);
l.countDown();
} catch (InterruptedException e) {
}
};
for (int i = 0; i < 9999; i++) {
dtp.addAndStartThread(wait, i + "");
}
long start = System.currentTimeMillis();
dtp.executeUntilDeplated(3000);
long took = System.currentTimeMillis() - start;
assert l.getCount() == 0 : "Await a count of 0 but has: " + l.getCount();
assert took < 2900 : "Should execute faster but took: " + took;
}
Can someone suggest a better unit-test?
Yes, I synchronized removeFirst
but I missed to synchronized unstartedRunnables.add
.
TLDR;
If I run the removeFirst
, there is a small chance that add
adds a new element asynchroniously. We have two arrays having differrent sizes. That is why the ArrayIndexOutOfBoundsException
is thrown. In order to solve the problem, I need to synchronize the unstartedRunnables.add
call.
A good improvement suggestion however came from @rzwitserloot in order to not synchonize against the this-object.