streamapache-camelaggregator

Aggregate only consecutive exchanges with same correlation key


I'm using Apache Camel and get a large file for input which I have to process line by line. The content is already sorted and I have to aggregate all consecutive lines with the same correlation key. If the correlation key changes, the previous aggregate has to be completed. If the file ends, the last aggregate has do be completed, too. I have some constraints: - Because the incoming file is rather large, we want to process it in a streaming fashion. - Because the result is given to a synchronous endpoint, I don't want to use the timeout completion predicate. Otherwise I would lose the backpressure regulating the speed of consumption of the data source and the exchanges will accumulate in the timeout map and aggregation repository of the AggregateProcessor.

The PreCompletionAwareAggregationStrategy looks like a promising solution but it turned out, that the last aggregate will not be completed until the next file arrives. If I use the CamelSplitComplete property in the preComplete, the last aggregate gets completed but without the last incoming exchange. Instead this last exchange will be added to the content of the next file arriving.

So currently I'm quite lost finding a solution which is not unduly ugly.


Solution

  • In the described scenario, I would send splitted messages to a route with an aggregator (let's call it "AggregationRoute") which its aggregation strategy implements PreCompletionAwareAggregationStrategy (the way you are already using it, I guess). Then, when the split ends, set the AGGREGATION_COMPLETE_ALL_GROUPS header to true and send it to the AggregationRoute. This exchange will only be used as a signal to complete all aggregation groups.

    Example:

    
        ...
        .split(body()).streaming()
            .to("direct:aggregationRoute")
        .end()
        .setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true))
        .to("direct:aggregationRoute");
    
    from("direct:aggregationRoute")
        .aggregate([your correlation expression]), myAggregationStrategy)
        ...
    

    Another alternative is to use an AggregateController to end the aggregation of all groups by calling its method forceCompletionOfAllGroups():

    
    AggregateController aggregateController = new DefaultAggregateController();
    
    from(...)
        ...
        .split(body()).streaming()
            .aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController)
                ...
                // Do what you need to do with the aggregated exchange
                ...
            .end()
        .end()
        .bean(aggregateController, "forceCompletionOfAllGroups")