I want to develop control-bus example using spring integration. As an example I took following sample: https://github.com/spring-projects/spring-integration-samples/tree/master/basic/control-bus
I decided to do the same but using java DSL.
I also read following topic: https://stackoverflow.com/a/27849287/2674303 but it didn't help me.
For now I have following source code:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("operationChannel")
.controlBus()
.get();
}
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
public MessageSource<String> inboundAdapter() {
return new MessageSource<String>() {
@Override
public Message receive() {
return new Message() {
@Override
public String getPayload() {
return "some_output_message";
}
@Override
public MessageHeaders getHeaders() {
return null;
}
};
}
};
}
@Bean
public AbstractMessageChannel adapterOutputChanel() {
return new QueueChannel();
}
}
And application:
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
ConfigurableApplicationContext ctx = new SpringApplication(MyApplication.class).run(args);
MessageChannel controlChannel = ctx.getBean("operationChannel", MessageChannel.class);
PollableChannel adapterOutputChanel = ctx.getBean("adapterOutputChanel", PollableChannel.class);
controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));
adapterOutputChanel.receive(1000);
}
}
But when I start the application I see following log:
2019-08-26 16:09:30.901 INFO 10532 --- [ main] control_bus.MyApplication : Started MyApplication in 1.248 seconds (JVM running for 2.401)
Exception in thread "main" org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@7351a16e] (controlBusFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0)]; nested exception is org.springframework.expression.EvaluationException: The method 'start' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute., failedMessage=GenericMessage [payload=@'inboundAdapter'.start(), headers={id=aef8f0dc-c3f5-5f7b-1a6d-7c9041f2d000, timestamp=1566824970903}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at control_bus.MyApplication.main(MyApplication.java:18)
Caused by: org.springframework.expression.EvaluationException: The method 'start' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute.
at org.springframework.integration.handler.ExpressionCommandMessageProcessor$ExpressionCommandMethodResolver.validateMethod(ExpressionCommandMessageProcessor.java:114)
at org.springframework.integration.handler.ExpressionCommandMessageProcessor$ExpressionCommandMethodResolver.resolve(ExpressionCommandMessageProcessor.java:95)
at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:205)
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:54)
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:390)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:90)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:114)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:365)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:151)
at org.springframework.integration.handler.ExpressionCommandMessageProcessor.processMessage(ExpressionCommandMessageProcessor.java:76)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
... 7 more
What do I wrong ?
I did several steps forward and my configudation looks like this:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("operationChannel")
.controlBus()
.get();
}
@Bean
public AbstractMessageChannel adapterOutputChanel() {
return new QueueChannel();
}
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
public MessageSource inboundAdapter() {
return new MyMessageSource();
}
public static class MyMessageSource implements MessageSource<String>, Lifecycle {
private volatile boolean started;
@Override
public Message receive() {
if (!isRunning()) {
return null;
}
return new Message() {
@Override
public String getPayload() {
return "some_output_message";
}
@Override
public MessageHeaders getHeaders() {
return null;
}
};
}
@Override
public void start() {
started = true;
}
@Override
public void stop() {
started = false;
}
@Override
public boolean isRunning() {
return started;
}
}
}
When I execute:
controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));
MyMessageSource#start is invoked
And the same for stop
but MyMessageSource#receive
is not invoked so
adapterOutputChanel.receive(1000)
always returns null
After Artem Bilan answer I have following configuration:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("operationChannel")
.controlBus()
.get();
}
@Bean
public AbstractMessageChannel adapterOutputChanel() {
return new QueueChannel();
}
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
@EndpointId("myInboundAdapter")
public MessageSource inboundAdapter() {
return new MyMessageSource();
}
public static class MyMessageSource implements MessageSource<String> {
@Override
public Message receive() {
return new Message() {
@Override
public String getPayload() {
return "some_output_message";
}
@Override
public MessageHeaders getHeaders() {
return new MessageHeaders(new HashMap());
}
@Override
public String toString() {
return getPayload() + ", " + getHeaders();
}
};
}
}
}
application output:
2019-08-26 17:20:54.087 INFO 11792 --- [ main] control_bus.MyApplication : Started MyApplication in 1.526 seconds (JVM running for 2.843)
Before start:null
2019-08-26 17:20:55.093 INFO 11792 --- [ main] o.s.i.e.SourcePollingChannelAdapter : started myInboundAdapter
After start:some_output_message, {id=857f4320-5158-6daa-8a03-3a0182436a78, timestamp=1566829255098}
2019-08-26 17:20:55.098 INFO 11792 --- [ main] o.s.i.e.SourcePollingChannelAdapter : stopped myInboundAdapter
After stop:null
According Java & Annotation configuration in Spring, there inboundAdapter
bean name (which is essentially a bean method name) is assigned exactly to what you declare as a bean. In your case it is a MessageSource
implementaiton. You indeed need to deal in your Control Bus command with the SourcePollingChannelAdapter
bean assigned to your MessageSource
via that @InboundChannelAdapter
. Only the problem that we need to figure out a right bean name to refer to it from the command:
The AbstractEndpoint bean name is generated with the following pattern: [configurationComponentName].[methodName].[decapitalizedAnnotationClassShortName]. For example, the SourcePollingChannelAdapter endpoint for the consoleSource() definition shown earlier gets a bean name of myFlowConfiguration.consoleSource.inboundChannelAdapter. See also Endpoint Bean Names.
From the Docs here: https://docs.spring.io/spring-integration/docs/5.2.0.BUILD-SNAPSHOT/reference/html/configuration.html#annotations_on_beans
So, I would suggest you to go Endpoint Bean Names recommendations and use an @EndpointId
alongside with that @InboundChannelAdapter
:
@Bean
@InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
@EndpointId("myInboundAdapter")
public MessageSource<String> inboundAdapter() {
So, your Control Bus command is going to be like this: "@myInboundAdapter.start()"
UPDATE
The Java DSL variant for MessageSource
wiring:
@Bean
public IntegrationFlow channelAdapterFlow() {
return IntegrationFlows.from(new MyMessageSource(),
e -> e.id("myInboundAdapter").autoStartup(false).poller(p -> p.fixedDelay(100)))
.channel(adapterOutputChanel())
.get();
}