javaspring-integrationspring-integration-dslcontrol-bus

EvaluationException: The method 'start' is not supported by this command processor


I want to develop control-bus example using spring integration. As an example I took following sample: https://github.com/spring-projects/spring-integration-samples/tree/master/basic/control-bus

I decided to do the same but using java DSL.

I also read following topic: https://stackoverflow.com/a/27849287/2674303 but it didn't help me.

For now I have following source code:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {

    @Bean
    public IntegrationFlow controlBusFlow() {
        return IntegrationFlows.from("operationChannel")
                .controlBus()
                .get();
    }

    @Bean
    @InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<String> inboundAdapter() {
        return new MessageSource<String>() {

            @Override
            public Message receive() {
                return new Message() {
                    @Override
                    public String getPayload() {
                        return "some_output_message";
                    }

                    @Override
                    public MessageHeaders getHeaders() {
                        return null;
                    }
                };
            }
        };
    }

    @Bean
    public AbstractMessageChannel adapterOutputChanel() {
        return new QueueChannel();
    }

}

And application:

@SpringBootApplication
public class MyApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext ctx = new SpringApplication(MyApplication.class).run(args);
        MessageChannel controlChannel = ctx.getBean("operationChannel", MessageChannel.class);
        PollableChannel adapterOutputChanel = ctx.getBean("adapterOutputChanel", PollableChannel.class);
        controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));
        adapterOutputChanel.receive(1000);
    }
}

But when I start the application I see following log:

2019-08-26 16:09:30.901  INFO 10532 --- [           main] control_bus.MyApplication                : Started MyApplication in 1.248 seconds (JVM running for 2.401)
Exception in thread "main" org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@7351a16e] (controlBusFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0)]; nested exception is org.springframework.expression.EvaluationException: The method 'start' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute., failedMessage=GenericMessage [payload=@'inboundAdapter'.start(), headers={id=aef8f0dc-c3f5-5f7b-1a6d-7c9041f2d000, timestamp=1566824970903}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at control_bus.MyApplication.main(MyApplication.java:18)
Caused by: org.springframework.expression.EvaluationException: The method 'start' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute.
    at org.springframework.integration.handler.ExpressionCommandMessageProcessor$ExpressionCommandMethodResolver.validateMethod(ExpressionCommandMessageProcessor.java:114)
    at org.springframework.integration.handler.ExpressionCommandMessageProcessor$ExpressionCommandMethodResolver.resolve(ExpressionCommandMessageProcessor.java:95)
    at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:205)
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:54)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:390)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:90)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:114)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:365)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:151)
    at org.springframework.integration.handler.ExpressionCommandMessageProcessor.processMessage(ExpressionCommandMessageProcessor.java:76)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
    ... 7 more

What do I wrong ?

update:

I did several steps forward and my configudation looks like this:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {

    @Bean
    public IntegrationFlow controlBusFlow() {
        return IntegrationFlows.from("operationChannel")
                .controlBus()
                .get();
    }

    @Bean
    public AbstractMessageChannel adapterOutputChanel() {
        return new QueueChannel();
    }

    @Bean
    @InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
    public MessageSource inboundAdapter() {
        return new MyMessageSource();
    }


    public static class MyMessageSource implements MessageSource<String>, Lifecycle {
        private volatile boolean started;

        @Override
        public Message receive() {
            if (!isRunning()) {
                return null;
            }
            return new Message() {
                @Override
                public String getPayload() {
                    return "some_output_message";
                }

                @Override
                public MessageHeaders getHeaders() {
                    return null;
                }

            };
        }

        @Override
        public void start() {
            started = true;
        }

        @Override
        public void stop() {
            started = false;
        }

        @Override
        public boolean isRunning() {
            return started;
        }
    }
}

When I execute:

    controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));

MyMessageSource#start is invoked

And the same for stop but MyMessageSource#receive is not invoked so

adapterOutputChanel.receive(1000)

always returns null

Solution:

After Artem Bilan answer I have following configuration:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
    @Bean
    public IntegrationFlow controlBusFlow() {
        return IntegrationFlows.from("operationChannel")
                .controlBus()
                .get();
    }
    @Bean
    public AbstractMessageChannel adapterOutputChanel() {
        return new QueueChannel();
    }
    @Bean
    @InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
    @EndpointId("myInboundAdapter")
    public MessageSource inboundAdapter() {
        return new MyMessageSource();
    }
    public static class MyMessageSource implements MessageSource<String> {
        @Override
        public Message receive() {
            return new Message() {
                @Override
                public String getPayload() {
                    return "some_output_message";
                }
                @Override
                public MessageHeaders getHeaders() {
                    return new MessageHeaders(new HashMap());
                }
                @Override
                public String toString() {
                    return getPayload() + ", " + getHeaders();
                }
            };
        }
    }
}

application output:

2019-08-26 17:20:54.087  INFO 11792 --- [           main] control_bus.MyApplication                : Started MyApplication in 1.526 seconds (JVM running for 2.843)
Before start:null
2019-08-26 17:20:55.093  INFO 11792 --- [           main] o.s.i.e.SourcePollingChannelAdapter      : started myInboundAdapter
After start:some_output_message, {id=857f4320-5158-6daa-8a03-3a0182436a78, timestamp=1566829255098}
2019-08-26 17:20:55.098  INFO 11792 --- [           main] o.s.i.e.SourcePollingChannelAdapter      : stopped myInboundAdapter
After stop:null

Solution

  • According Java & Annotation configuration in Spring, there inboundAdapter bean name (which is essentially a bean method name) is assigned exactly to what you declare as a bean. In your case it is a MessageSource implementaiton. You indeed need to deal in your Control Bus command with the SourcePollingChannelAdapter bean assigned to your MessageSource via that @InboundChannelAdapter. Only the problem that we need to figure out a right bean name to refer to it from the command:

    The AbstractEndpoint bean name is generated with the following pattern: [configurationComponentName].[methodName].[decapitalizedAnnotationClassShortName]. For example, the SourcePollingChannelAdapter endpoint for the consoleSource() definition shown earlier gets a bean name of myFlowConfiguration.consoleSource.inboundChannelAdapter. See also Endpoint Bean Names.

    From the Docs here: https://docs.spring.io/spring-integration/docs/5.2.0.BUILD-SNAPSHOT/reference/html/configuration.html#annotations_on_beans

    So, I would suggest you to go Endpoint Bean Names recommendations and use an @EndpointId alongside with that @InboundChannelAdapter:

    @Bean
    @InboundChannelAdapter(channel = "adapterOutputChanel", autoStartup = "false", poller = @Poller(fixedDelay = "1000"))
    @EndpointId("myInboundAdapter")
    public MessageSource<String> inboundAdapter() {
    

    So, your Control Bus command is going to be like this: "@myInboundAdapter.start()"

    UPDATE

    The Java DSL variant for MessageSource wiring:

    @Bean
    public IntegrationFlow channelAdapterFlow() {
        return IntegrationFlows.from(new MyMessageSource(), 
                    e -> e.id("myInboundAdapter").autoStartup(false).poller(p -> p.fixedDelay(100)))
                .channel(adapterOutputChanel())
                .get();
    }