javaapache-sparkamazon-s3spark-structured-streaming

File Streaming using Spark


I am using Spark file streaming from s3. Code is running well but For the first batch all files are being considered valid from directory. Please help i am new for spark. My code is Below.

<pre>
 
Dataset<Row> df = spark.readStream().format("csv").schema(getOLRSchema()).load(
                "s3a://filePath/*.csv");
        
query = df.writeStream().foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
    @Override
    public void call(Dataset<Row> ds, Long batchId) throws Exception {
        long timestamp = System.currentTimeMillis();    
        ds.write().parquet("hdfs://ha-cluster/testStreaming/");
        System.out.println("Time taken to complete: " + 
           (System.currentTimeMillis() -    timestamp));
    }
})
.option("checkpointLocation", "hdfs://ha-cluster/mnt/vol2/Ajit/checkpoint")
.option("latestFirst", "true")
.option("maxFilesPerTrigger", "32")
.trigger(Trigger.ProcessingTime(120000))
.outputMode("append")
.start();

query.awaitTermination();

</pre>

Solution

  • The Spark documentation explains that only newly noticed files will be processed, and existing ones will be ignored