apache-camelcxfeip

Apache Camel EIP route - How to stop split()


I have some problems with a following route:

// from("cxf:....")...
from("direct:start").process(startRequestProcessor) // STEP 1
            .choice()
                .when(body().isNull())
                        .to("direct:finish")
                .otherwise()
                    .split(body())  // STEP 2
                    .bean(TypeMapper.class) // STEP 3
                    .log("Goes to DynamicRouter:: routeByTypeHeader with header: ${headers.type}")
                    .recipientList().method(Endpoint1DynamicRouter.class, "routeByTypeHeader") // STEP 4
                    .ignoreInvalidEndpoints();

    from("direct:endpoint2") // STEP 6
            .log("Goes to DynamicRouter::routeByCollectionHeader with header: ${headers.collection}")
            .recipientList().method(Endpoint2DynamicRouter.class, "routeByCollectionHeader")
            .ignoreInvalidEndpoints();

    from("direct:endpoint1.1") // STEP 5
            .process(new DateRangeProcessor())
            .to("direct:collections");

    from("direct:endpoint1.2") // STEP 5
            .process(new SingleProcessor())
            .to("direct:collections");


    from("direct:endpoint2.2") // STEP 7
            .aggregate(header("collection" /** endpoint2.2 */), CollectionAggregationStrategy)
            .completionSize(exchangeProperty("endpoint22"))

            .process(new QueryBuilderProcessor())
            .bean(MyService, "getDbCriteria")

            .setHeader("collection", constant("endpoint2.1"))
            .to("direct:endpoint2.1").end();


    from("direct:endpoint2.1") // STEP 8
            .aggregate(header("collection" /** endpoint2.1 */), CollectionAggregationStrategy)
            .completionSize(exchangeProperty("CamelSplitSize"))
            .to("direct:finish").end();

    from("direct:finish")
            .process(new QueryBuilderProcessor())
            .bean(MyRepository, "findAll")
            .log("ResponseData: ${body}").
            marshal().json(JsonLibrary.Gson).end();

The route

  1. Receives json string an converts it to list (HashSet) of JSONObjects.
  2. split the received list to json objects.
  3. Set corresponding headers according to object content
  4. Routes the messages according to headers to endpoint1.1 or endpoint1.2
  5. Convert messages to mongodb Criteria and send to endpoint2
  6. Endpoint2 routes messages according to another header to endpoint2.1 or endpoint2.2.
  7. Endpoint2.2 aggregates all received messages, processes it to get mongodb Criteria and sends it to endpoint2.1 (completionSize is calculated at step 2 and saved in property "endpoint22").
  8. Enpoint2.1 aggregates ALL messages (CamelSplitSize) converts aggregated messages to Query object and sends it to Repository to retrieve the data.

I can see valid response object in debugger but anyway I get an error:

No message body writer has been found for class java.util.HashSet, ContentType: application/json

The problem is not in response object as it works with other routes and it does not contain HashSets.

My guess is that route sends to the output the HashSet created tat STEP 1...

My questions are:

Any help would be much appreciated! Thanks.


Solution

  • I find it very strange, but .aggregate() function does not reply exchange. It uses you aggregation strategy but always reply incoming exchange. This is not clear when reading documentation, but you have to use aggregation strategy along with split() to be able to return exchange.