I am using channel.basicConsume
(just like in very basic Rabbit tutorial):
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
Now I want to make tasks, handled in DeliverCallback
cooperatively-cancellable (for example, by other queue consumer).
First thing that comes to mind is Thread.interrupt()
, which will often help to interrupt blocking calls.
For example, the code might look like this:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// Let's assume very dumb registry for now
someTaskRegistry.setRunningTask(Thread.currentThread());
try {
String message = new String(delivery.getBody(), "UTF-8");
Thread.sleep(60000); // Library code that allows interruption
intensiveComputationPart1();
if (Thread.interrupted()) {
// Flag is unset now, is that ok?
return;
}
intensiveComputationPart2();
if (Thread.interrupted()) {
// Flag is unset now, is that ok?
return;
}
inteisiveComputationPart3();
if (Thread.interrupted()) {
// Flag is unset now, is that ok?
return;
}
System.out.println(" [x] Handled '" + message + "'");
} catch (InterruptedException ie) {
// What should I do here?
// throw ie ?
} finally {
someTaskRegistry.unsetCurrentTask();
}
};
and the cancellign part:
DeliverCallback cancelCallback = (consumerTag, delivery) -> {
someThreadRegistry.getCurrentTask().interrupt();
};
channel.basicConsume(CANCEL_QUEUE_NAME, true, cancelCallback, consumerTag -> { });
The question raised is how safe it is, specifically:
Thread.interrupted()
)?Thread.interrupted()
)?I ended up with delegating cancellable parts to ExecutorService
, since inside the DeliveryCallback
there are some interruptible operations which I don't want to be interrupted: the only option was to separate interruptible and non-interruptible parts of the handler into different threads.
With ExecutorService
you can still .get()
on the Future
, returned by execute
and block, in the DeliveryCallback
code. This will allow you to properly and timely issue acks
(and nacks
).
While it solves my initial problem, the questions I've raised are still on.