Currently I have set up a function to consumer Kafka messages as follows:
@Bean(name = "streamSrc")
public java.util.function.Consumer<org.springframework.messaging.Message<byte[]>> consumeStream() {
return message -> {
byte[] rawMessage = message.getPayload();
byte[] rawKey = (byte[]) message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
Consumer<?, ?> consumer = (Consumer<?, ?>) message.getHeaders().get(KafkaHeaders.CONSUMER);
String topic = (String) message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
int partitionId = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
log.debug("processing message {}, key {}, using consumer {}, for topic {}, partition {}", rawMessage, rawKey, consumer, topic, partitionId);
consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
//do processing of message
};
}
then in properties:
spring.cloud.function.definition=streamSrc
spring.cloud.stream.function.bindings.streamSrc-in-0=source1
spring.cloud.stream.bindings.source1.content-type=application/json
spring.cloud.stream.bindings.source1.destination=my-kafka-topic
spring.cloud.stream.bindings.source1.consumer.header-mode=headers
spring.cloud.stream.bindings.source1.group=group1
spring.cloud.stream.bindings.source1.consumer.partitioned=true
spring.cloud.stream.bindings.source1.consumer.concurrency=2
spring.cloud.stream.kafka.bindings.source1.consumer.idleEventInterval=5000
spring.cloud.stream.kafka.bindings.source1.consumer.configuration.max.poll.records=100
This effectively sets an interval of 5 seconds where a batch of 100 records is read per interval. Is there a better, perhaps more declarative, way to schedule these intervals? Newer versions of Spring Cloud support "batch mode" e.g. ....consumer.batch-mode=true
but I'm not aware of any way, property-driven or annotation-driven, to enforce a schedule for consuming messages. Bonus would be skipping an interval if all messages had not completed processing. Any alternate ideas are welcome.
P.s. to un-pause, I'm using:
@Bean
public ApplicationListener<ListenerContainerIdleEvent> tryUnpause() {
return event -> {
if (!event.getConsumer().paused().isEmpty()) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
idleEventInterval
That's not what that property means; it means "publish a container idle event" if no records have been received within that interval.
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#idleEventInterval
and https://docs.spring.io/spring-kafka/docs/current/reference/html/#idle-containers
Listener containers are message-driven so records will be processed whenever they are available.
You can use a ListenerContainerCustomizer
bean to set the idleBetweenPolls
to delay the next poll after completing the processing of the records from the previous poll.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#idleBetweenPolls