javadesign-patternsapache-camelspring-integrationintegration-patterns

Integration pattern : how to sync processing message received from multiple systems


I am building a system that will receive messages via a Message broker (Currently, JMS) from different systems. All the messages from all the senders systems have a deviceId and there is no order in the reception of the message. For instance, system A can send a message with deviceId=1 and system b be can send a message with deviceId=2.

My goal is not to start processing of the messages concerning the same deviceId unless I got all the message from all the senders with the same deviceId.

For example, if I have 3 systems A, B and C sending messages to my system :

System A sends messageA1 with deviceId=1
System B sends messageB1 with deviceId=1
System C sends messageC1 with deviceId=3
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1.

Should this problem be resolved by using some sync mechanism in my system , by the message broker or an integration framework like spring-integration/apache camel ?


Solution

  • A similar solution with the Aggregator (what @Artem Bilan mentioned) can also be implemented in Camel with a custom AggregationStrategy and with controlling the Aggregator completion by using the Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP property.

    The following might be a good starting point. (You can find the sample project with tests here)

    Route:

    from("direct:start")
        .log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}")
        .aggregate(header("deviceId"), new SignalAggregationStrategy(3))
        .log(LoggingLevel.INFO, "Signaled body: ${body}")
        .to("direct:result");
    

    SignalAggregationStrategy.java

    public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate {
    
        private int numberOfSystems;
    
        public SignalAggregationStrategy(int numberOfSystems) {
            this.numberOfSystems = numberOfSystems;
        }
    
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            Exchange exchange = super.aggregate(oldExchange, newExchange);
    
            List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class);
    
            // Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different)
            // https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy
            if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) {
                exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
            }
    
            return exchange;
        }
    
        @Override
        public boolean matches(Exchange exchange) {
            // make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion)
            return false;
        }
    }
    

    Hope it helps!