javaspringspring-integrationspring-xdspring-kafka

Spring Integration & Kafka Consumer: Stop message-driven-channel-adapter right after records are sucessfully fetched


I'm using the following confing:

I created my custom Kafka-source module for SpringXD. I set my consumer logic and my message-driven-channel-adapter (which I'm using in conjunction with a control-bus to stop my channel adapter). So far so good. Also I'm using as a kafka property max.poll.record=10 to fetch 10 records per poll.

I would like to make sure that I'm stopping my channel right after all records (in this case 10 records) have been successfully fetched.

So for example: I would like to AVOID Stopping reading when not all records are been successfully fetched and processed (that is, when the records are not been sent to the output channel).

Is there a way to tell that?

This is my xml config, just in case:

xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
                               http://www.springframework.org/schema/context
                       http://www.springframework.org/schema/context/spring-context.xsd">


<int:channel id="input_to_control_bus" />
<int:channel id="output" />

<context:component-scan base-package="com.kafka.source.logic" />


<int:control-bus id="my_control_bus" input-channel="input_to_control_bus" />

<int-kafka:message-driven-channel-adapter
    id="kafkaInboundChannelAdapterTesting" listener-container="container1"
    auto-startup="false" phase="100" send-timeout="5000" channel="output"
    mode="record" message-converter="messageConverter" />

<bean id="messageConverter"
    class="org.springframework.kafka.support.converter.MessagingMessageConverter" />

<!--Consumer -->
<bean id="container1"
    class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    <entry key="enable.auto.commit" value="false" />
                    <entry key="auto.commit.interval.ms" value="100" />
                    <entry key="session.timeout.ms" value="15000" />
                    <entry key="max.poll.records" value="3" />
                    <entry key="group.id" value="bridge-stream-testing" />
                    <entry key="key.deserializer"
                        value="org.apache.kafka.common.serialization.IntegerDeserializer" />
                    <entry key="value.deserializer"
                        value="org.apache.kafka.common.serialization.StringDeserializer" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>

    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="testing-topic" />
        </bean>
    </constructor-arg>
</bean>

[UPDATE N°1] Why do I want to do this? These are the details:

Those are some details about this scenario. There are more scenarios but I don't want to mix it using the same SO question.

[UPDATE N°2]

Some thoughts after Artem's answer.

I want to avoid to keep messages in memory, that is the reason I was using message-driven-channel-adapter + max.poll.records


Solution

  • What I can suggest is like AtomicInteger bean, which is increased on each processed record and when you are reaching the threshold you perform the stop() for your kafkaInboundChannelAdapterTesting.