multithreadingspring-bootspring-integrationspring-integration-dslspring-dsl

Multithreading Spring integration DSL


I want to create a simple IntegrationFlow with Spring integration, and I am having difficulties.

I want to create an integration flow that takes messages from multiple queues in Rabbit Mq and posts the messages to different Rest endpoints.

i want to know if i can parallelize this.

i have two scenarios that i want to check the feasibility :

Scenario 1

Scenario 2

 HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        RestTemplate restTemplate = new RestTemplate();
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(BOUTIQUE_QUEUE_NAME);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
                .handle(msg ->
                {
                    String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
                    HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
                    restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
                    System.out.println(msgString);
                   
                })
                .get();
    }

Solution

  • For the first scenario, simply configure a inbound adapter for each and set the output channel to a common channel for the downstream flow.

    For the second scenario, simply set the concurrentConsumers and maxConcurrentConsumers on the listener container and it will scale up/down the threads as needed.

    See the Spring AMQP Documentation.