spring-bootapache-cameljunit5spring-camelcamel-test

org.apache.camel.FailedToCreateConsumerException: Failed to create Consumer for endpoint: direct://validateFile


I'm getting Failed to create consumer endpoint issue, while asserting against the headers. Can someone help me on this ?

I have this file watcher route which consumes files and validate the file. If its valid file will update the headers and send it to s3 blob. Otherwise it will send it to error directory.

here are my routes:

  from("file-watch:test?events=CREATE&useFileHashing=true&antInclude=**/*.txt&recursive=true")
                .process(fileProcessor)
                .toD("direct:validateFile")
                .choice()
                .when(exchange -> exchange.getIn().getHeader("isValid").equals("valid"))
                .to("direct:updateMessageHeaders")
                .otherwise()
                .toD("direct:processErrorFiles")
                .endChoice()
                .end();

        from("direct:validateFile")
                .routeId("validateFile")
                .choice()
                .when(exchange -> Long.parseLong(exchange.getIn().getHeader("fileSize").toString()) > 0)
                .setHeader("isValid", simple("valid"))
                .otherwise()
                .setHeader("isValid", simple("invalid"))
                .endChoice()
                .end();
 final Processor fileProcessor = exchange -> {
        String fileName = exchange.getIn().getHeader("CamelFileAbsolutePath").toString();
        File gdisFile = new File(fileName);
        exchange.getIn().setHeader("fileSize", gdisFile.length());
    };

TestCase

public class RouteTest extends CamelTestSupport {

    @Override
    public RouteBuilder createRouteBuilder() throws Exception
    {
        return new Route();
    }
    @Test
    public void header_validation() {

        Map<String, Object> headers = new HashMap<>();
        headers.put("fileSize", 100);

        template.sendBodyAndHeaders("direct:validateFile", null, headers);
assertEquals("valid",consumer.receive("direct:validateFile").getIn().getHeader("isValid"));
    }
}

Exception

org.apache.camel.FailedToCreateConsumerException: Failed to create Consumer for endpoint: direct://validateFile. Reason: java.lang.IllegalArgumentException: Cannot add a 2nd consumer to the same endpoint: direct://validateFile. DirectEndpoint only allows one consumer.
at org.apache.camel.support.cache.DefaultConsumerCache.acquirePollingConsumer(DefaultConsumerCache.java:107)


Solution

  • When you call

    template.sendBodyAndHeaders
    

    Camel performs fire-and-forget - it sends your data into the endpoint and does not wait the result.

    You need to use

    tamplate.requestBodyAndHeaders
    

    This method waits the data to pass all routes and returns the result back to you.