I am using Spark file streaming from s3. Code is running well but For the first batch all files are being considered valid from directory. Please help i am new for spark. My code is Below.
<pre>
Dataset<Row> df = spark.readStream().format("csv").schema(getOLRSchema()).load(
"s3a://filePath/*.csv");
query = df.writeStream().foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
@Override
public void call(Dataset<Row> ds, Long batchId) throws Exception {
long timestamp = System.currentTimeMillis();
ds.write().parquet("hdfs://ha-cluster/testStreaming/");
System.out.println("Time taken to complete: " +
(System.currentTimeMillis() - timestamp));
}
})
.option("checkpointLocation", "hdfs://ha-cluster/mnt/vol2/Ajit/checkpoint")
.option("latestFirst", "true")
.option("maxFilesPerTrigger", "32")
.trigger(Trigger.ProcessingTime(120000))
.outputMode("append")
.start();
query.awaitTermination();
</pre>
The Spark documentation explains that only newly noticed files will be processed, and existing ones will be ignored