apache-camelcamel-ftp

Apache Camel FTP integration


I have just started working around the Apache Camel. I have a requirement to implement an FTP/FTPS/SFTP client, which would be used to fetch the files from the respective servers. I was looking into the possibility of using Apache Camel to do this but I am still confused after going through the examples and the tutorials.

The requirement is to fetch the files from the FTP/SFTP servers when the request is received from the scheduler.

Following is the route created using EndPoint-DSL

@Component
public class FtpReceiveRoute extends EndpointRouteBuilder {

@Override
public void configure() throws Exception {
    from(
            ftp("localhost:2001/home/admin")
            .account("admin")
            .password("admin12345")
            .recursive(true)
        )
    .routeId("ftpReceive")
    .log("From done!")
    .to("log:ftp-log")
    .log("To done!!");
}

}

I am trying to use the above route by invoking it when the request is made to fetch the file like below.

@Override
protected FtpResponse doMessage(String param, FtpRequest req) {
    FtpResponse response = new FtpResponse ();
    CamelContext ctx = new DefaultCamelContext();
    
    ctx.addRoutes(##route); //FtpReceiveRoute, add the Routebuilder instance as EndpointRouteBuilder is acceptable.
    ctx.start();

    //Might need to induce sleep so that all the files are downloaded

    ctx.stop();

    return response;
}

The confusion is around how to invoke the Camel process with the route. I have used EndpointRouteBuilder to create the route because of the type-safe creation of the endpoint URI. I am not getting an option to add this route to the CamelContext as it expects the RouteBuilder instance which is not type-safe.

Further, the CamelContext is the engine and to invoke the route I would need to start and stop this engine. This I am not able to digest if I need to start and stop the engine to execute a route then I would need to induce some sleep in between so that all files are downloaded. Just to add there are more routes that I need to add with the implementation. Once the engine is started it would load and execute all the added routes which is not the requirement.

Maybe I am not getting how to use this properly. Any resources aiding my situation are welcome. Thanks.


Solution

  • You should not create and start new camel context every time you want to fetch file from server. What you should do instead is start one when your application starts and use that for all your exchanges.

    You can use Spring-boot to initialize CamelContext and add annotated RouteBuilders to it automatically. Check the maven archetype camel-archetype-spring-boot for example.

    If you want to call camel routes from Java you can Inject CamelContext to your bean and use it to create ProducerTemplate. This can be used to invoke Routes defined in the RouteBuilder.

    Using ProducerTemplate.send you can get the resulting exchange.

    Using producer template

    Using File-component which works very similary to ftp-component.

    package com.example;
    
    import org.apache.camel.builder.endpoint.EndpointRouteBuilder;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MySpringBootRouter extends EndpointRouteBuilder  {
    
        @Override
        public void configure() {
    
            from(direct("fileFromFTP"))
                .routeId("fileFromFTP")
                // reads files from <project>/input using file consumer endpoint
                .pollEnrich(file("input"), 1000)
                // If file is found, convert body to string. 
                // Which in this case will read contents of the file to string.
                .filter(body().isNotNull())
                    .convertBodyTo(String.class)
                .end()
            ;
        }
    }
    
    package com.example;
    
    import org.apache.camel.CamelContext;
    import org.apache.camel.Exchange;
    import org.apache.camel.ProducerTemplate;
    import org.apache.camel.support.DefaultExchange;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    
    import static org.apache.camel.builder.endpoint.StaticEndpointBuilders.direct;
    
    @Configuration
    @EnableScheduling
    public class MySpringBean {
    
        @Autowired
        CamelContext camelContext;
    
        @Scheduled(fixedRate = 1000)
        public void scheduledTask() {
           
            System.out.println("Scheduled Task!");
            if(camelContext.isStopped()) {
                System.out.println("Camel context not ready yet!");
                return;
            }
            useProducerTemplate();
        }
    
        public void useProducerTemplate(){
    
            ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
            Exchange inExchange = new DefaultExchange(camelContext);
    
            //synchronous call!
            Exchange result = producerTemplate.send(direct("fileFromFTP").toString(), inExchange);
            String resultBody = result.getMessage().getBody(String.class);
            String fileName = result.getMessage().getHeader(Exchange.FILE_NAME, String.class);
    
            if(resultBody != null){
                System.out.println("Consumed file: "+ fileName + " contents: " + resultBody.toString());
            }
            else{
                System.out.println("No file to consume!");
            }
        }
    }
    

    Depending on what you need to do with the files you could probably do that inside camel route. Then you would only need to call the producerTemplate.sendBody.

     public void useProducerTemplate(){
    
            ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
            Exchange inExchange = new DefaultExchange(camelContext);
            producerTemplate.sendBody(direct("fileFromFTP").toString(), inExchange);
        }
    

    Starting stopping camel route

    If you want to start polling file consumer only for a short while you can do start the route and use for example aggregation timeout to shutdown the route when no new files have been received in any given duration.

    @Component
    public class MySpringBootRouter extends EndpointRouteBuilder  {
    
        @Override
        public void configure() {
    
            AggregationStrategy aggregateFileNamesStrategy = AggregationStrategies
                .flexible(String.class)
                .accumulateInCollection(ArrayList.class)
                .pick(header(Exchange.FILE_NAME))
            ;
    
            from(file("input"))
                .routeId("moveFilesRoute")
                .autoStartup(false)
                .to(file("output"))
                .to(seda("moveFilesRouteTimeout"));
            ;
    
            from(seda("moveFilesRouteTimeout"))
                .routeId("moveFilesRouteTimeout")
                .aggregate(constant(true), aggregateFileNamesStrategy)
                .completionTimeout(3000)
                    .log("Consumed files: ${body.toString()}")
                    .process(exchange -> {
                        exchange.getContext().getRouteController().stopRoute("moveFilesRoute");
                    })
                .end()
            ;
        }
    }
    
    public void startMoveFilesRoute() {
    
            try {
                System.out.println("Starting moveFilesRoute!");
                camelContext.getRouteController().startRoute("moveFilesRoute");
                //Sending null body moveFilesRouteTimeout to trigger timeout if there are no files to transfer
                camelContext.createProducerTemplate().sendBody(seda("moveFilesRouteTimeout").toString(), null);
            } catch(Exception e) {
                System.out.println("failed to stop route. " + e);
            }
    }