Im working with spark streaming and don't want to process the old files when the new streaming file come every 10 minutes:
val val1= spark
.read //
.option("header", "true")
.option("schema", "true")
.option("sep", ",")
.csv(path_to_file).toDF().cache()
val1.registerTempTable("test")
after creating the dataframe i do some transformation and process the checkpoint can help me and how i used in my case
*****************the solution*******************
val spark = SparkSession .builder .appName("test") .config("spark.local", "local[*]") .getOrCreate() spark.sparkContext.setCheckpointDir(path_checkpoint) and after i call the checkpoint function on the dataframe And i specified a trigger to execute the job
.writeStream
.format("csv")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.option("checkpointLocation",CheckPoint)
.trigger(Trigger.ProcessingTime("180 seconds"))
.option("Path",Path )
.option("header", true)
.outputMode("Append")
.queryName("test")
.start()