I am using a ArrayBlockingQueue
to transport work items from a Supplier to multiple workers.
I am also using Thread.interrupt
to signal the Supplier to stop supplying new work items.
However, my process of Interrupting the Supplier followed by clearing the queue doesn't reliably clear my work queue.
The function I use to clear the queue looks as follows:
private static void clear(Thread supplier) {
supplier.interrupt();
queue.clear();
}
while my Supplier looks like this
try {
while (true) {
Runnable r = () -> { /* Ignored for now */};
queue.put(r);
}
} catch (InterruptedException iex) {
}
When asked for the interaction between Thread.interrupt
and ArrayBlockingQueue::put
, the Javadoc says the following:
public void put(E e)
throws InterruptedException
Inserts the specified element at the tail of this queue,
waiting for space to become available if the queue is full.
(...)
Throws:
InterruptedException - if interrupted while waiting
Well, that leaves a lot of questions. Sure, if the Supplier in is waiting because the queue is full, and gets interrupted while waiting, it will throw the Exception. But what if the queue is full and the Supplier is already carrying the 'interrupted' flag? Will it wait? Will it throw the Exception? What if the queue isn't full, and the Supplier gets interrupted while in this function? Will it throw the Exception? Will it modify the queue? What if the queue isn't full and the Supplier already carries the 'interrupted' flag?
Compare this with ReentrantLock::lockInterruptable
: There the documentation says
public void lockInterruptibly()
throws InterruptedException
Acquires the lock unless the current thread is interrupted.
(...)
If the current thread:
has its interrupted status set on entry to this method; or
is interrupted while acquiring the lock,
then InterruptedException is thrown and the current thread's interrupted status is cleared.
In this implementation, as this method is an explicit interruption point,
preference is given to responding to the interrupt over normal or reentrant acquisition of the lock.
Not wanting to give up here, I started to look into the implementation of ArrayBlockingQueue here. This is interesting to me because ArrayBlockingQueue
uses a ReentrantLock
internally - and we have already seen the documentation for ReentrantLock to be much more detailed. Even better ArrayBlockingQueue::put
uses the already cited ReentrantLock::lockInterruptibly
- which is perfect because I am going to interrupt the Thread that is calling put
. And ArrayBlockingQueue::clear
calles ReentrantLock::lock
on the same lock.
So if I am not mistaken, with the lock guarding the ArrayBlockingQueue
, there is only one way how there could still be an item in the queue after calling clear
: If somehow the Supplier would enter the queue.put
- and thereby ReentrantLock::lockInterruptable
after the Worker has finished with their queue.clear
. But the worker first calls supplier.interrupt()
, so the supplier should already have their interruptedFlag set, and according to the documentation in ReentrantLock
that should give me the InterruptedException instead of acquiring the lock, making it impossible to modify the queue and instead ending the supply function.
To test this, I wrote the following example class:
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
public class ABQ {
private static void work() {
while(true) {
try {
queue.take().run();
} catch (InterruptedException iex) {
// Top level interrupt -> Do nothing
}
}
}
private static void clear(Thread caller, int level) {
caller.interrupt();
queue.clear();
tripped.set(level);
}
private static ArrayBlockingQueue<Runnable> queue;
private static ThreadLocal<Integer> tripped = ThreadLocal.withInitial(() -> -1);
public static void main(String[] args) {
queue = new ArrayBlockingQueue<Runnable>(5);
Thread[] workers = new Thread[5];
for (int i=0; i < 5; ++i) {
workers[i] = new Thread(ABQ::work);
workers[i].start();
}
final Thread caller = Thread.currentThread();
final Random rdm = new Random();
for (int i=0; ;i++) {
final int finalI = i;
try {
Thread.sleep(2000);
System.err.println("2s sleep over");
} catch (InterruptedException iex) {
continue;
}
try {
while (true) {
Runnable r = () -> {
int waitTime;
boolean success;
if (tripped.get() >= finalI) {
throw new RuntimeException("Assertion failed, work for "+finalI+" but already tripped "+tripped.get());
}
synchronized (rdm) {
waitTime = rdm.nextInt(1000) + 200;
success = rdm.nextBoolean();
}
try {
Thread.sleep(waitTime);
if (success) {
clear(caller, finalI);
}
} catch (InterruptedException iex) {
// Inner interrupt -> also ignore
}
};
queue.put(r);
}
} catch (InterruptedException iex) {
System.err.println("EOL");
}
}
}
}
This repeatably fills the queue with work items until it gets interrupted, and the proceeds to the next batch. Each worker is aware of the highest batch they ended and will complain if they find another item for this batch. And on my machine, all of the workers will fail on this assertion.
So my process of clearing the queue does not work guaranteed. Why? And how do I make it work?
When a thread is going to wait for a condition it will release the lock and reacquire it when being signaled. This is the only way, another thread could change the condition to “fulfilled”, as that other thread must be able to acquire the lock to change the condition.
Reacquiring the lock after waiting for a condition is different to initially acquiring the lock via lockInterruptibly()
.
We can easily demonstrate both scenarios:
Interrupted before entering put
:
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
System.out.println("adding a non-full queue with interrupted state");
Thread.currentThread().interrupt();
try {
queue.put("first string");
} catch(InterruptedException ex) {
System.out.println("got " + ex);
}
queue.put("second string");
System.out.println("adding a full queue with interrupted state");
Thread.currentThread().interrupt();
try {
queue.put("third string");
} catch(InterruptedException ex) {
System.out.println("got " + ex);
}
System.out.println("queue: " + queue);
adding a non-full queue with interrupted state
got java.lang.InterruptedException
adding a full queue with interrupted state
got java.lang.InterruptedException
queue: [second string]
Interrupted while waiting for a full queue to become “not full”
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
Thread main = Thread.currentThread();
new Thread(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
for(StackTraceElement ste: main.getStackTrace())
System.out.println("\tat "+ste);
main.interrupt();
queue.clear();
}).start();
for(int i = 0; i < 6; i++) {
queue.put("item " + i);
}
System.out.println("queue: " + queue);
System.out.println(Thread.currentThread().isInterrupted()?
"interrupted": "not interrupted");
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
at ABQ.main(ABQ.java:38)
queue: [item 5]
interrupted
Though, sometimes I get
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
at ABQ.main(ABQ.java:38)
Exception in thread "main" java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
at ABQ.main(ABQ.java:38)
When you eliminate the wait for a condition, e.g. replace queue.put(r);
with while(!queue.offer(r)) Thread.sleep(10);
you get the intended ordering of interrupt detection and queue processing. But I don’t recommend it.
Your overall design is suspicious as the described logic only works for one consumer thread. In your example code, worker threads processing a smaller i
may clear the queue while the producer is already in a later iteration. Maybe a Phaser
or CyclicBarrier
might be a better fit to your actual task than dealing with interruption manually.