google-cloud-pubsubspring-cloud-streamspring-cloud-stream-binder

Ho to manually acknowledge a gcp pub sub message when using spring cloud stream only if the ack-mode is set to MANUAL


I have a GCP pub sub implementation using spring cloud stream binders.

I would like to do manual acknowledgement for messages only if the ackMode is set to MANUAL. Currently I am using

    @Bean
    public Consumer<Message<String>> pubsub1() {
        return message -> {
            log.info("New message received from Pub/Sub: {}", message);

            // For manual acknowledgment
            BasicAcknowledgeablePubsubMessage acknowledgeablePubsubMessage = message.getHeaders()
                    .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);

            try {
                // Process the message payload
                messageProcessor.processMessage("GCP-PubSub", message.getPayload());

                // Acknowledge the message
                acknowledgeablePubsubMessage.ack();
                log.info("Message acknowledged successfully.");
            } catch (Exception e) {
                log.error("Error processing message: {}", e.getMessage(), e);
                // NACK the message
                acknowledgeablePubsubMessage.nack();
                log.info("Message negatively acknowledged.");
            }
        };
    }

Is there a better way to do it without GCP specific terms like

message.getHeaders()
                    .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);

Also I am setting the ack-mode in the consumer binding in the yml. Is there a way to check if the consumer is set to AUTO or MANUAL acknowledgement before doing this?


Solution

  • No, there is no a common API for such a functionality. Well, there is in Spring Integration - see org.springframework.integration.acks. But that is not implemented as a wrapped around protocol-specific drivers. For example, even for Apache Kafka binder we still rely on the org.springframework.kafka.support.Acknowledgment header: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/manual-ack.html.

    The presence of that header should be enough to to be sure that we are in AUTO or MANUAL mode. Not sure, though, how Pub/Sub binder is implemented.