javamultithreadingunit-testing

ArrayList.removeFirst() throw ArrayIndexOutOfBoundsException but was synchronized


Having this Threadpool:

package xxx;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A thread pool that hold an amount of work. During execution the amount of
 * work may grow.
 * 
 * @param <A> The work
 */
public class DepleatingFiFoThreadPool<A> {

    public static final Logger LOG = Logger.getLogger(DepleatingFiFoThreadPool.class.getCanonicalName());

    public record Socket<A>(A r, String threadPostfix) {
    }

    private final Thread[] threadsRunning;
    private final ArrayList<Socket<A>> unstartedRunnables = new ArrayList<>();
    private int nextFreeSocketIndex = 0;
    private final Consumer<Throwable> errorHandler;
    private final Object lock = new Object();
    private boolean waitingForLock = false;

    private final String prefix;
    private final Consumer<A> invoker;

    /**
     * Create the threadpool.
     * 
     * @param threadsRunningMax The maximum of Threads running at the same time,
     *                          must be higher than <code>0</code>
     * @param errorHandler      The handler in case of problems, never
     *                          <code>null</code>
     * @param prefix            The prefix for the pool, required for debugging
     *                          purposes, never <code>null</code>
     * @param invoker           The invoker who encapsule the work, never
     *                          <code>null</code>
     */
    public DepleatingFiFoThreadPool(final int threadsRunningMax, final Consumer<Throwable> errorHandler,
            final String prefix, final Consumer<A> invoker) {
        this.errorHandler = errorHandler;
        this.prefix = prefix;
        this.invoker = invoker;
        this.threadsRunning = new Thread[threadsRunningMax];
    }

    /**
     * Add and start a thread.
     * 
     * Work after returning from {@link #executeUntilDeplated(long)} will not be
     * executed.
     * 
     * @param notRunningThread The not running work that is designated to be done
     * @param threadPostfix    The postfix of the thread, never <code>null</code>
     */
    public void addAndStartThread(final A notRunningThread, final String threadPostfix) {
        var mustCallLater = false;
        if (nextFreeSocketIndex < threadsRunning.length) {
            synchronized (this) {
                if (nextFreeSocketIndex < threadsRunning.length) {
                    start(notRunningThread, threadPostfix);
                } else {
                    mustCallLater = true;
                }
            }
        } else {
            mustCallLater = true;
        }
        if (mustCallLater) {
            unstartedRunnables.add(new Socket<A>(notRunningThread, threadPostfix));
        }
    }

    private void finished() {
        synchronized (this) {
            threadsRunning[--nextFreeSocketIndex] = null;
            if (nextFreeSocketIndex == 0) {
                synchronized (lock) {
                    if (waitingForLock) {
                        lock.notify();
                    }
                }
            } else {
                Socket<A> e = null;
                if (!unstartedRunnables.isEmpty()) {
                    synchronized (unstartedRunnables) {
                        if (!unstartedRunnables.isEmpty()) {
                            e = unstartedRunnables.removeFirst();
                        }
                    }
                }
                if (e != null) {
                    start(e.r, e.threadPostfix);
                }
            }
        }
    }

    private void start(final A notRunningThread, final String name) {
        var socket = nextFreeSocketIndex++;
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    invoker.accept(notRunningThread);
                } catch (Throwable e) { // fault barrier!
                    try {
                        errorHandler.accept(e);
                    } catch (Throwable ta) {
                        ta.addSuppressed(e);
                        LOG.log(Level.SEVERE, ta.getMessage(), ta);
                    }
                }
                finished();
            }
        }, prefix + name);
        threadsRunning[socket] = t;
        t.start();
    }

    /**
     * Execute the threads and every subsequent thread that is added during the
     * progress.
     * 
     * @param timeoutMs The timeout in miliseconds, should be an positive value
     * @return <code>false</code> if the timeout has reached, <code>true</code>
     *         otherwise
     * @throws InterruptedException If the thread has been interrupted (maybe
     *                              shutdown of application)
     */
    public boolean executeUntilDeplated(final long timeoutMs) throws InterruptedException {
        AtomicBoolean resultHolder = new AtomicBoolean(true);
        if (nextFreeSocketIndex > 0) {
            Thread timeout = new Thread(() -> {
                try {
                    Thread.sleep(timeoutMs);
                    resultHolder.set(false);
                    synchronized (lock) {
                        lock.notify();
                    }
                } catch (InterruptedException e) {
                    LOG.log(Level.FINE, "Not a bug, timeout not reached, watchdog not required.", e);
                }
            });
            timeout.start();
            if (nextFreeSocketIndex > 0) {
                waitingForLock = true;
                synchronized (lock) {
                    try {
                        lock.wait(timeoutMs);
                    } finally {
                        timeout.interrupt();
                    }
                }
            }
        }
        return resultHolder.get();
    }
}

A exception is thrown sometimes having this Stacktrace:

Exception in thread "NN/SQL-caller" java.lang.ArrayIndexOutOfBoundsException: arraycopy: last source index 50 out of bounds for object array[49]
  at java.base/java.lang.System.arraycopy(Native Method)
  at java.base/java.util.ArrayList.fastRemove(ArrayList.java:724)
  at java.base/java.util.ArrayList.removeFirst(ArrayList.java:573)
  at xxx.DepleatingFiFoThreadPool.finished(DepleatingFiFoThreadPool.java:74)
  at xxx.DepleatingFiFoThreadPool$1.run(DepleatingFiFoThreadPool.java:100)
  at java.base/java.lang.Thread.run(Thread.java:1570)

I am not able to debug this because it appears only once every three to five weeks.

I use this test but I could not find the problem:

public void testMassiveQueuedFunction() throws InterruptedException {
    var dtp = new DepleatingFiFoThreadPool<Runnable>(100, x -> System.out.println(x), "testQueueFunction",
            Runnable::run);
    var l = new CountDownLatch(0);
    Runnable wait = () -> {
        try {
            Thread.sleep(2);
            l.countDown();
        } catch (InterruptedException e) {
        }
    };
    for (int i = 0; i < 9999; i++) {
        dtp.addAndStartThread(wait, i + "");
    }
    long start = System.currentTimeMillis();
    dtp.executeUntilDeplated(3000);
    long took = System.currentTimeMillis() - start;
    assert l.getCount() == 0 : "Await a count of 0 but has: " + l.getCount();
    assert took < 2900 : "Should execute faster but took: " + took;
}

Can someone suggest a better unit-test?


Solution

  • Yes, I synchronized removeFirst but I missed to synchronized unstartedRunnables.add.

    TLDR;

    If I run the removeFirst, there is a small chance that add adds a new element asynchroniously. We have two arrays having differrent sizes. That is why the ArrayIndexOutOfBoundsException is thrown. In order to solve the problem, I need to synchronize the unstartedRunnables.add call.

    A good improvement suggestion however came from @rzwitserloot in order to not synchonize against the this-object.