javaapache-camelaggregationspring-camelcamel-spring-dsl

In Apache Camel with Spring Boot, how to Join many external callbacks (using "from().to()") into a single response?


SOLVED! Scroll down to Solution.

I have entity Person with some basic data on table A and more specific data on tables B, C, D, etc (address, for example).

PersonResponseDTO (summarized):

{
    "id": 1,
    "name": "Test"
}

AddressResponseDTO (summarized):

{
    "person_id": 1,
    "street": "Test St."
}

These data come from an external API called using from("direct:getPersonById").to(getPersonUrl) and from("direct:getAddressByPersonId").to(getAddressUrl) (summarized).

I created a third object called AggregatedPersonResponseDTO:

{
    "person": {
        "id": 1,
        "name": "Test"
    },
    "address": {
        "person_id": 1,
        "street": "Test St."
    }
}

Is there a simple way to join both responses in a single request, returning an object of type AggregatedPersonResponseDTO, only using the Camel API? I want to use both response objects to build the third one. And I will have use cases in the future with more than two "joins".

Solution explanation

  1. It's not needed to set streamCaching to either true or false.

  2. Not needed to set HTTP_PATH.

  3. Code in the Camel route:

from("direct:getFullPersonByIdService")
    .toD("http:{{endpoints.get-person-by-id}}?bridgeEndpoint=true")
    .pollEnrich(
        simple("http:{{endpoints.get-address-by-person-id}}?bridgeEndpoint=true"),
        5000,
        new PersonAggregationStrategy(),
        false
    )
    .unmarshal(new JacksonDataFormat(GetAggregatedPersonResponseDTO.class))

The content between double curly-braces is read from the application.yml or application.properties.

  1. The whole PersonAggregationStrategy class:
@Log4j2
public class PersonAggregationStrategy implements AggregationStrategy {

    @SneakyThrows
    @Override
    public Exchange aggregate(final Exchange exchangePerson,
                              final Exchange exchangeAddress) {
        log.info("Aggregating Person and Address...");

        ObjectMapper objectMapper = new ObjectMapper();

        final GetAggregatedPersonResponseDTO aggregatedPerson = new GetAggregatedPersonResponseDTO();
        aggregatedPerson.setPerson(objectMapper.readValue(exchangePerson.getIn().getBody(String.class), GetPersonResponseDTO.class));
        aggregatedPerson.setAddress(objectMapper.readValue(exchangeAddress.getIn().getBody(String.class), GetAddressResponseDTO.class));

        exchangePerson.getIn().setBody(objectMapper.writeValueAsString(aggregatedPerson));
        log.info("Aggregated object => {}", objectMapper.writeValueAsString(aggregatedPerson));

        return exchangePerson;
    }

}
  1. I also had to implement the TypeConverters interface for the resulting object of the aggregation:
@Component
public class AggregatedPersonConverter implements TypeConverters {

    private final ObjectMapper mapper;

    @Autowired
    public AggregatedPersonConverter(ObjectMapper mapper) {
        this.mapper = mapper;
    }

    @Converter
    public InputStream getAggregatedPersonResponseDTOToInputStream(GetAggregatedPersonResponseDTO source) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();

        try {
            ObjectOutputStream oos = new ObjectOutputStream(baos);

            oos.writeObject(source);

            oos.flush();
            oos.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        return new ByteArrayInputStream(baos.toByteArray());
    }

}
  1. I don't know if it works for more than two callbacks. Maybe it will need other implementations of AggregationStrategy. I'll test this use case some day.

Solution

  • You have to model a route which enriches the first web service result with the second one. The way you merge both responses has to be specified in your AggregationStrategy instance.

    See the enrich EIP: https://camel.apache.org/components/3.14.x/eips/enrich-eip.html