I have a GCP pub sub implementation using spring cloud stream binders.
I would like to do manual acknowledgement for messages only if the ackMode is set to MANUAL. Currently I am using
@Bean
public Consumer<Message<String>> pubsub1() {
return message -> {
log.info("New message received from Pub/Sub: {}", message);
// For manual acknowledgment
BasicAcknowledgeablePubsubMessage acknowledgeablePubsubMessage = message.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
try {
// Process the message payload
messageProcessor.processMessage("GCP-PubSub", message.getPayload());
// Acknowledge the message
acknowledgeablePubsubMessage.ack();
log.info("Message acknowledged successfully.");
} catch (Exception e) {
log.error("Error processing message: {}", e.getMessage(), e);
// NACK the message
acknowledgeablePubsubMessage.nack();
log.info("Message negatively acknowledged.");
}
};
}
Is there a better way to do it without GCP specific terms like
message.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
Also I am setting the ack-mode in the consumer binding in the yml. Is there a way to check if the consumer is set to AUTO or MANUAL acknowledgement before doing this?
No, there is no a common API for such a functionality.
Well, there is in Spring Integration - see org.springframework.integration.acks
. But that is not implemented as a wrapped around protocol-specific drivers. For example, even for Apache Kafka binder we still rely on the org.springframework.kafka.support.Acknowledgment
header: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/manual-ack.html.
The presence of that header should be enough to to be sure that we are in AUTO
or MANUAL
mode. Not sure, though, how Pub/Sub binder is implemented.