I have already gone through the basics, working example and the checkpoint directory contents.
Say I am reading from a streaming source e.g. Kafka (but not specific to it alone) - I then write it to a CSV. Checkpoints enabled at both read and write streams.
Say I have read 1000 records from the source, while writing to the csv, I encounter a failure after writing 250 records. I want to understand how checkpoints would behave ? Assuming that the dataframe is lost from memory - ideally the read checkpoint would be 1000 and the write checkpoint would be 250, so on restart we would not have anything to read and the records 250 to 1000 would be lost ?
How do you handle such a scenario and ensure the code is restartable ?
Is it needed to have the checkpoint at both read and write ?
If I have to set the offset to a number as 250 - how do we do it dynamically ?
Sample code:
# Reading from Kafka
read_stream = spark.readStream \
.format("kafka") \ # Please assume any source really - not specific to kafka
.option("kafka.bootstrap.servers", "host:port") \
.option("subscribe", "topic_name") \
.option("startingOffsets", "earliest") \
.option("checkpointLocation", "/path/to/read/checkpoint") \
.load()
# Writing to CSV
write_stream = read_stream \
.writeStream \
.format("csv") \
.option("path", "/output/path") \
.option("checkpointLocation", "/path/to/write/checkpoint") \
.start()
Not sure about your use case but in general, only checkpoint
in write stream is needed to set as it has already saved the full state of your query.
When you set your checkpoint in your write stream, when your query / application is stopped due to whatever reasons and when you restart your query / application, you read stream will start at the offset that based on the metadata that you have in the sink table and you DON'T need to set the anything in the read stream options. Adding checkpoint
in read stream is redundant unless you have some special use case.
To answer your question, all you need to do is to set the checkpoint
in WRITE stream only. When you restart the whole query, the read stream will start at the checkpoint
that is saved in the sink table. It should also have no duplication as you're processing and saving data that haven't written in the sink table in your new-run query.
If you really need to set checkpoint
in the read stream in your use case, when you restart your query, based on the context you provided, Yes your 251 to 1000 records won't be written to your sink table because your read stream will not read the data between these offsets.