javaapache-camelazure-data-lake

directoryName option not working for AzureDataLake Camel consumer


I'm trying to setup a camel route in java, reading a csv in an Azure Data Lake and processing it. So far, it looks like this :

from("direct:updateValues")
        .pollEnrich()
        .simple(messageRouteConfig.getFrom() + "&fileName=${body}")
        .log(LoggingLevel.INFO, log, "message content (file content): ${body}")
        .log(LoggingLevel.INFO, log, "Starting processing file : ${file:name}")

It then processes and send it somewhere (this part works). I pass the name of the file (testing.csv) in the body and messageRouteConfig.getFrom() returns the following URI :

azure-storage-datalake:account01/test?clientId=clientId&tenantId=tenantId&clientSecret=secret&directoryName=/foo/bar

The issue is that the directoryName option doesn't seem to be taken into account. In the data lake, the file location is : test/foo/bar/testing.csv. But when trying to get it, Camel seems to try to reach : test/testing.csv.

I also tried it this way (removing directoryName from the URI) without any success either :

from("direct:updateThings")
            .setHeader("CamelAzureStorageDataLakeDirectoryName", constant("/foo/bar"))
            .pollEnrich()
            .simple(messageRouteConfig.getFrom() + "&fileName=${body}")
            .log(LoggingLevel.INFO, log, "message content (file content): ${body}")
            .log(LoggingLevel.INFO, log, "Starting processing file : ${file:name}")

But if I remove any directoryName options and pass /foo/bar/testing.csv in the body (as the filename), it works. Any idea as to what to do here ?

Side question : anyway to get the read file name ? I tried logging it (as per the last line), but it doesn't seem to work.

Thanks a lot !


Solution

  • I'm trying to setup a camel route in java, reading a csv in an Azure Data Lake and processing it.

    I followed this Official Camel documentation,

    You can use the below code to read csv file from azure-data lake using camel route.

    Code:

    
    public class App {
    
        public static void main(String[] args) throws Exception {
            CamelContext context = new DefaultCamelContext();
            DataLakeComponent azureStorageDataLakeComponent = new DataLakeComponent();
            DataLakeConfiguration configuration = new DataLakeConfiguration();
          
            configuration.setClientId("xxxx");
            configuration.setTenantId("xxxx");
            configuration.setClientSecret("Bxxxx");
            azureStorageDataLakeComponent.setConfiguration(configuration);
            context.addComponent("azure-storage-datalake", azureStorageDataLakeComponent);
            context.addRoutes(new RouteBuilder() {
                @Override
                public void configure() throws Exception {
                    from("direct:start")
                        .to("azure-storage-datalake:venkat8912/test?operation=getFile&fileName=foo/bar/001.csv")
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                // Retrieve file content as InputStream
                                InputStream inputStream = exchange.getMessage().getBody(InputStream.class);
                                String fileContent = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
                                fileContent = fileContent.replaceAll("(?m)^[ \t]*\r?\n", ""); // Remove empty lines
                                System.out.println("File Content:\n" + fileContent);
                            }
                        })
                        .to("mock:results");
                }
            });
    
            // Start the context
            context.start();
    
            // Send a message to trigger the route
            context.createProducerTemplate().sendBody("direct:start", null);
    
            // Allow the route to run for a while before stopping the context
            Thread.sleep(5000);
            
            // Stop the context
            context.stop();
        }
    }
    

    Output:

    File Content:
    PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
    1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
    2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
    3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
    4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
    5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
    6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
    7,0,1,"McCarthy, Mr. Timothy J",male,54,0,0,17463,51.8625,E46,S
    8,0,3,"Palsson, Master. Gosta Leonard",male,2,3,1,349909,21.075,,S
    9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27,0,2,347742,11.1333,,S
    10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14,1,0,237736,30.0708,,C
    

    enter image description here