I'm using the following confing:
I created my custom Kafka-source module for SpringXD. I set my consumer logic and my message-driven-channel-adapter
(which I'm using in conjunction with a control-bus
to stop my channel adapter). So far so good. Also I'm using as a kafka property max.poll.record=10
to fetch 10 records per poll.
I would like to make sure that I'm stopping my channel right after all records (in this case 10 records) have been successfully fetched.
So for example: I would like to AVOID Stopping reading when not all records are been successfully fetched and processed (that is, when the records are not been sent to the output channel).
Is there a way to tell that?
This is my xml config, just in case:
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<int:channel id="input_to_control_bus" />
<int:channel id="output" />
<context:component-scan base-package="com.kafka.source.logic" />
<int:control-bus id="my_control_bus" input-channel="input_to_control_bus" />
<int-kafka:message-driven-channel-adapter
id="kafkaInboundChannelAdapterTesting" listener-container="container1"
auto-startup="false" phase="100" send-timeout="5000" channel="output"
mode="record" message-converter="messageConverter" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter" />
<!--Consumer -->
<bean id="container1"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="max.poll.records" value="3" />
<entry key="group.id" value="bridge-stream-testing" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.IntegerDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="testing-topic" />
</bean>
</constructor-arg>
</bean>
[UPDATE N°1] Why do I want to do this? These are the details:
max.poll.records
to ensure that I'm fetching at most X messages per poll.Those are some details about this scenario. There are more scenarios but I don't want to mix it using the same SO question.
[UPDATE N°2]
Some thoughts after Artem's answer.
max.poll.records
and just wait until having reached Y minutes and having counted X messages, and then stop
the channel?start
again the channel?I want to avoid to keep messages in memory, that is the reason I was using message-driven-channel-adapter
+ max.poll.records
What I can suggest is like AtomicInteger
bean, which is increased on each processed record and when you are reaching the threshold you perform the stop()
for your kafkaInboundChannelAdapterTesting
.