apache-sparkspark-structured-streamingcheckpointing

How to set the number of documents processed in a batch?


With Spark 2.2.0 checkpointing works a little different than the versions. There is a commits folder that gets created and after completion of every batch a file gets written to the folder.

I am facing a scenario where in I have about 10k records and say my streaming job fails at about the middle after it processed 5k records, there is no file written to the commit folder in the checkpoint directory and hence when I restart the job, it starts from the beginning and there is data duplication of 5k records.

From what I understand, looks like when there is commit file written and when you restart it pick ups from the latest offset otherwise it reprocesses the data in the last successful batch.

How to set the number of documents to be processed in the batch?


Solution

  • Is there a way I could set the number of documents to be processed in the batch?

    Use maxFilesPerTrigger for files and maxOffsetsPerTrigger for kafka format.