I'm using SpringXD
, and I have the following config:
I have the following config in my xml file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="input" />
<int:channel id="output" />
<int:control-bus input-channel="input" />
<int-kafka:message-driven-channel-adapter
id="kafka-inbound-channel-adapter-testing" listener-container="container1"
auto-startup="false" phase="100" send-timeout="5000"
channel="output" mode="record"
message-converter="messageConverter" />
<bean id="messageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter" />
<!--Consumer -->
<bean id="container1"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="max.poll.records" value="3" />
<entry key="group.id" value="bridge-stream-testing" />
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer" />
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="testing-topic" />
</bean>
</constructor-arg>
</bean>
</beans>
This is the Java class that I'm using to start/stop the channel:
package com.kafka.source.logic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@Configuration
@EnableScheduling
@ImportResource("classpath:/config/kafka-source-context.xml")
public class KafkaSourceRetry {
@Autowired
MessageChannel input;
@Scheduled(cron="*/50 * * * * *")
void startAdapter(){
//CODE COMMENTED OUT TO MAKE SURE THE ADAPTER IS NOT BEING STARTED
//EVEN IF I UNCOMMENT THE CODE, THE 50 secs defined related to the cron are not respected.
//That is, if I send a message to the topic, it is inmediately consumed
//input.send(new GenericMessage<String>("@kafka-inbound-channel-adapter-testing.start()"));
}
}
Then I created a basic stream to check if some messages that I'm sending to the topic are coming through
stream create --name bridgeStream --definition "kafkaSourceLatestApi_v2|bridge|file" --deploy
I checked the file that was created, and it contained all the messages that I sent to the Kafka topic:
hola_que_tal que_bonito bridgeStream.out (END)
Also in the logs I found this:
2017-04-10T22:37:06-0300 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - Starting beans in phase 0 2017-04-10T22:37:06-0300 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - Starting bean 'container1' of type [class org.springframework.kafka.listener.KafkaMessageListenerContainer] 2017-04-10T22:37:06-0300 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - Successfully started bean 'container1' 2017-04-10T22:37:06-0300 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - Starting beans in phase 100 2017-04-10T22:37:06-0300 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - Starting bean 'kafka-inbound-channel-adapter-testing' of type [class org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter] 2017-04-10T22:37:06-0300 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 inbound.KafkaMessageDrivenChannelAdapter - started kafka-inbound-channel-adapter-testing 2017-04-10T22:37:06-0300 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 support.DefaultLifecycleProcessor - Successfully started bean 'kafka-inbound-channel-adapter-testing'
My question is: Why does the channel start automatically?
It's designed that way; all modules have auto startup set to false so they don't start out of order; when you deploy a stream, the individual modules are deployed and started right-to-left.
Deploy/Undeploy is the way to start/stop streams.