I would like to route messages from more routes to the same route but it does not work in the manner as I assumed. I set up the following (I am just puting down the essence):
from("direct:a") [...]
.to("direct:c");
from("direct:b") [...]
.to("direct:c");
from(direct:c) <my aggregator functionality comes here>
.to("direct:someOtherRoute");
However, this works only when exactly one route either "a" or "b" goes to "c" but not both. How should I achieve to route both "a" and "b" to "c"? Thanks.
EDIT1:
I tried the solution of Alexey but using "seda" or "vm" did not solve the problem. Actually, regardless of calling route "c" with seda or vm, the aggregator is invoked only once either from route "a" or from route "b".
However, if I create another route "c2" with the same content and route e.g. "b" to "c2", then it works. Nevertheless, it is not really nice way to solve it.
Do you have any further ideas? I am using the routes within the same CamelContext, so within the same JVM.
I have also found an interesting remark on the link http://camel.apache.org/seda.html It states as Alexey and Sunar also told that seda and vm are asynchronous and direct synchronous but you can also implement asynchronous functionality with direct as follows:
from("direct:stageName").thread(5).process(...)
"[...] Instead, you might wish to configure a Direct endpoint with a thread pool, which can process messages both synchronously and asynchronously. [...]
I also tested it but in my case it did not yield any fruits.
EDIT2:
I add here how I am using the aggregator, i.e. route "c" in this example:
from("vm:AGGREGATOR").routeId("AGGREGATOR")
.aggregate( constant("AGG"), new RecordAggregator())
.completionTimeout(AGGREGATOR_TIMEOUT)
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
LOGGER.info("### Process AGGREGATOR");
[...]
}
})
.marshal().csv()//.tracing()
.to("file:extract?fileName=${in.header.AGG}.csv")
.end();
In the log the String "### Process Aggregator" appears only once. I am just wondering whether it cannot depend on the .completionTimeout(AGGREGATOR_TIMEOUT) I am using. In my undestanding, a file should be created for each different AGG value in the header within this time. Is this understanding correct?
I think the using of asynchronous components, such as seda, vm, activemq might solve your problem.
Such behavior direct component because direct is synchronous component, this is also likely related to using the aggregator in the third route.
Example:
from("direct:a") [...]
.to("seda:c");
from("direct:b") [...]
.to("seda:c");
from(seda:c) <your aggregator functionality comes here>
.to("direct:someOtherRoute");
EDIT1:
Now, when I see an aggregator, I think that's the problem, in the completion criteria.
In your case, you have to use expression for correlationExpression
:
from("vm:AGGREGATOR").routeId("AGGREGATOR")
.aggregate().simple("${header.AGG}",String.class) // ${property.AGG}
.aggregationStrategy(new RecordAggregator())
.completionInterval(AGGREGATOR_TIMEOUT) //.completionTimeout(AGGREGATOR_TIMEOUT)
.forceCompletionOnStop()
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
LOGGER.info("### Process AGGREGATOR");
[...]
}
})
.marshal().csv()//.tracing()
.to("file:extract?fileName=${in.header.AGG}.csv&fileExist=Override")
.end();
and, maybe completionTimeout
is too low...