I'm running PySpark using a Spark cluster in local mode and I'm trying to write a streaming DataFrame to a Kafka topic.
When I run the query, I get the following message:
java.lang.IllegalStateException: Set(topicname-0) are gone. Some data may have been missed..
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
This is my code:
query = (
output_stream
.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "ratings-cleaned")
.option("checkpointLocation", "checkpoints-folder")
.start()
)
sleep(2)
print(query.status)
This error message typically shows up when some messages/offsets were removed from the source topic since the last run of the query. The removal happened due to the cleanup policy, such as retention time.
Imagine your topic has messages with offsets 0, 1, 2 which have all been processed by the application. The checkpoint files stores that last offset 2 to remember continue with offset 3 next time it starts.
After some time, messages with offset 3, 4, 5 were produced to the topic but messages with offset 0, 1, 2, 3 were removed from the topic due to its retention.
Now, when restarting your spark structured streaming job it tries to fetch 3 based on its checkpoint files but realises that only the message with offset 4 is available. In exactly that case it will throw this exception.
You can solve this by
.option("failOnDataLoss", "false")
in your readStream
operation, orAccording to the Structured Streaming + Kafka Integration Guide the option failOnDataLoss
is described as:
"Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. Batch queries will always fail if it fails to read any data from the provided offsets due to lost data."