javamultithreadingrabbitmq

Java RabbitMQ basicConsume + thread interruption


I am using channel.basicConsume (just like in very basic Rabbit tutorial):

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

Now I want to make tasks, handled in DeliverCallback cooperatively-cancellable (for example, by other queue consumer).

First thing that comes to mind is Thread.interrupt(), which will often help to interrupt blocking calls. For example, the code might look like this:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    // Let's assume very dumb registry for now
    someTaskRegistry.setRunningTask(Thread.currentThread());
    try {
        String message = new String(delivery.getBody(), "UTF-8");
        Thread.sleep(60000); // Library code that allows interruption
        
        intensiveComputationPart1();
        if (Thread.interrupted()) {
            // Flag is unset now, is that ok?
            return;
        }
        intensiveComputationPart2();
        if (Thread.interrupted()) {
            // Flag is unset now, is that ok?
            return;
        }
        inteisiveComputationPart3();
        if (Thread.interrupted()) {
            // Flag is unset now, is that ok?
            return;
        }

        System.out.println(" [x] Handled '" + message + "'");
    } catch (InterruptedException ie) {
        // What should I do here?
        // throw ie ?
    } finally {
        someTaskRegistry.unsetCurrentTask();
    }
};

and the cancellign part:

DeliverCallback cancelCallback = (consumerTag, delivery) -> {
    someThreadRegistry.getCurrentTask().interrupt();
};
channel.basicConsume(CANCEL_QUEUE_NAME, true, cancelCallback, consumerTag -> { });

The question raised is how safe it is, specifically:


Solution

  • I ended up with delegating cancellable parts to ExecutorService, since inside the DeliveryCallback there are some interruptible operations which I don't want to be interrupted: the only option was to separate interruptible and non-interruptible parts of the handler into different threads.

    With ExecutorService you can still .get() on the Future, returned by execute and block, in the DeliveryCallback code. This will allow you to properly and timely issue acks (and nacks).

    While it solves my initial problem, the questions I've raised are still on.