I have an Azure Function made in Java, which is triggered by RabbitMQ messages on a specific queue.
My problem is that I need to output two different messages, one in the middle of the function's code and one at the end. I saw that OutputBinding.setValue
doesn't immediately send a message but only sets the value to be used as the output of the function.
Here's my code structure:
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.rabbitmq.annotation.RabbitMQOutput;
import com.microsoft.azure.functions.rabbitmq.annotation.RabbitMQTrigger;
public class MyFunction {
@FunctionName("MyFunction")
public void run(
@RabbitMQTrigger(connectionStringSetting = "RABBIT_URL", queueName = "input-queue")
InputMessage message,
@RabbitMQOutput(connectionStringSetting = "RABBIT_URL", queueName = "output-queue")
OutputBinding<OutputMessage> output,
final ExecutionContext context) {
// some code...
output.setValue(new OutputMessage(OutputMessage.Status.PROCESSING, "some other info 1"));
// other code...
output.setValue(new OutputMessage(OutputMessage.Status.DONE, "some other info 2"));
}
}
public record OutputMessage (Status status, String info) {
public enum Status {
PROCESSING,
DONE
}
}
The message with status "PROCESSING" needs to be fired before the function has done its thing, but if I test the function and go to the RabbitMQ management interface I see that there's only 1 message in the output queue, not 2, and it's the one with status "DONE".
Is there actually a way to send 2 messages from 1 function by using the available tools provided by the Azure Functions Java library/es? Or do I have to import the RabbitMQ library, connect to the queue and manually send a message through that?
The issue occurred because @RabbitMQOutput
combined with OutputBinding.setValue()
only sends a single message when the function finishes. It doesn't support sending multiple messages at different points during execution.
To fix the issue, I used the native RabbitMQ Java client (com.rabbitmq.client
) inside the RabbitMQ trigger function. This allows me to manually publish messages to the output-queue exactly when needed, one during processing and another after completion.
RabbitMQTriggerJava.java :
import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.rabbitmq.annotation.RabbitMQTrigger;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQTriggerJava {
private static final String RABBIT_URL = System.getenv("RABBIT_URL");
private static final String OUTPUT_QUEUE = "output-queue";
@FunctionName("RabbitMQTriggerJava")
public void run(
@RabbitMQTrigger(connectionStringSetting = "RABBIT_URL", queueName = "input-queue")
InputMessage message,
final ExecutionContext context) {
context.getLogger().info("Processing message: " + message);
try (Connection connection = createRabbitConnection();
Channel channel = connection.createChannel()) {
OutputMessage processingMsg = new OutputMessage(OutputMessage.Status.PROCESSING, "Processing started");
channel.basicPublish("", OUTPUT_QUEUE, null, processingMsg.toString().getBytes());
Thread.sleep(2000);
OutputMessage doneMsg = new OutputMessage(OutputMessage.Status.DONE, "Processing completed");
channel.basicPublish("", OUTPUT_QUEUE, null, doneMsg.toString().getBytes());
} catch (Exception e) {
context.getLogger().severe("Error sending message: " + e.getMessage());
}
}
private Connection createRabbitConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(RABBIT_URL);
return factory.newConnection();
}
}
InputMessage.java :
public class InputMessage {
private String content;
public InputMessage() {
}
public InputMessage(String content) {
this.content = content;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "InputMessage{content='" + content + "'}";
}
}
OutputMessage.java :
public class OutputMessage {
private Status status;
private String info;
public OutputMessage(Status status, String info) {
this.status = status;
this.info = info;
}
public Status getStatus() {
return status;
}
public String getInfo() {
return info;
}
@Override
public String toString() {
return "OutputMessage{" +
"status=" + status +
", info='" + info + '\'' +
'}';
}
public enum Status {
PROCESSING,
DONE
}
}
local.settings.json :
"RABBIT_URL": "amqp://guest:guest@localhost:5672",
I sent a message to the input-queue in RabbitMQ as shown below,
The RabbitMQ trigger function ran successfully.
I successfully received two messages in the output-queue
of RabbitMQ as shown below.