I am building a system that will receive messages via a Message broker (Currently, JMS) from different systems. All the messages from all the senders systems have a deviceId and there is no order in the reception of the message. For instance, system A can send a message with deviceId=1 and system b be can send a message with deviceId=2.
My goal is not to start processing of the messages concerning the same deviceId unless I got all the message from all the senders with the same deviceId.
For example, if I have 3 systems A, B and C sending messages to my system :
System A sends messageA1 with deviceId=1
System B sends messageB1 with deviceId=1
System C sends messageC1 with deviceId=3
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1.
Should this problem be resolved by using some sync mechanism in my system , by the message broker or an integration framework like spring-integration/apache camel ?
A similar solution with the Aggregator (what @Artem Bilan mentioned) can also be implemented in Camel with a custom AggregationStrategy
and with controlling the Aggregator completion by using the Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP
property.
The following might be a good starting point. (You can find the sample project with tests here)
Route:
from("direct:start")
.log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}")
.aggregate(header("deviceId"), new SignalAggregationStrategy(3))
.log(LoggingLevel.INFO, "Signaled body: ${body}")
.to("direct:result");
SignalAggregationStrategy.java
public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate {
private int numberOfSystems;
public SignalAggregationStrategy(int numberOfSystems) {
this.numberOfSystems = numberOfSystems;
}
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Exchange exchange = super.aggregate(oldExchange, newExchange);
List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class);
// Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different)
// https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy
if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) {
exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
}
return exchange;
}
@Override
public boolean matches(Exchange exchange) {
// make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion)
return false;
}
}
Hope it helps!