spring-bootspring-integrationcontrol-bus

Spring Integration how to use Control Bus with JavaConfig, no DSL


I'm having a few issues with Spring Integration and the control bus. I need to turn auto-start off on an InboundChannelAdapter. However when I do this I can't get the ControlBus to start the channel adapter.

I've searched for an answer online, but most of the examples use XML configuration.

Here is the entirety of my code:

package com.example.springintegrationdemo;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.ExpressionControlBusFactoryBean;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.GenericMessage;

import java.io.File;

@SpringBootApplication
@EnableIntegration
public class SpringIntegrationDemoApplication {

    @Bean
    public MessageChannel fileChannel() {
        return new DirectChannel();
    }

    @Bean(name = "fileMessageSource")
    @InboundChannelAdapter(channel = "fileChannel", poller = @Poller(fixedDelay = "1000"),autoStartup = "false")
    public MessageSource<File> fileMessageSource() {
        FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource();
        fileReadingMessageSource.setDirectory(new File("lz"));
        return fileReadingMessageSource;
    }

    @Bean
    @ServiceActivator(inputChannel = "fileChannel")
    public MessageHandler messageHandler() {
        MessageHandler messageHandler = message -> {
            File f = (File) message.getPayload();
            System.out.println(f.getAbsolutePath());
        };
        return messageHandler;
    }

    @Bean
    MessageChannel controlChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "controlChannel")
    ExpressionControlBusFactoryBean controlBus() {
        ExpressionControlBusFactoryBean expressionControlBusFactoryBean = new ExpressionControlBusFactoryBean();
        return expressionControlBusFactoryBean;
    }

    @Bean
    CommandLineRunner commandLineRunner(@Qualifier("controlChannel") MessageChannel controlChannel) {
        return (String[] args)-> {
            System.out.println("Starting incoming file adapter: ");
            boolean sent = controlChannel.send(new GenericMessage<>("@fileMessageSource.start()"));
            System.out.println("Sent control message successfully? " + sent);
            while(System.in.available() == 0) {
                Thread.sleep(50);
            }
        };
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationDemoApplication.class, args);
    }

}

The message is sent to the control bus component successfully, but the inbound channel adapter never starts.

I would appreciate any help.

Thanks, Dave


Solution

  • See here: https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#annotations_on_beans

    The fileMessageSource bean name is exactly for the FileReadingMessageSource. A SourcePollingChannelAdapter created from the InboundChannelAdapter has this bean name: springIntegrationDemoApplication.fileMessageSource.inboundChannelAdapter.

    The @EndpointId can help you to simplify it.

    In other words: everything is OK with your config, only the problem that you don't use the proper endpoint id to start the SourcePollingChannelAdapter.