I'm looking to store a PySpark DataFrame in streaming, make changes to it for each batch, and then save the updated DataFrame again using foreachBatch. What's the simplest way to accomplish this.
i am using checkpoint for the stream.
My Code Structure:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.streaming import StreamingQuery
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
checkpoint_dir = "/path/to/checkpoint"
input_df = spark.readStream.format("your_input_source").load()
def process_batch(df, batch_id):
pass
query = (
input_df.writeStream
.foreachBatch(process_batch)
.outputMode("append")
.option("checkpointLocation", checkpoint_dir)
.start()
)
query.awaitTermination()
The foreachBatch
operator is really a type of sink that allows you to write the resulting DataFrame at the end of each micro-batch to a place of your choosing (that isn't natively supported with any built-in connectors). For example, this could be an OLTP database that doesn't have a Spark connector yet, or you might need to write to multiple sinks. Those should really be the only reasons to use foreachBatch
.
You should not try to do data processing in foreachBatch
if you can use streaming operators instead, such as select/filter, deduplication, windowed aggregation, or arbitrary stateful operations in Python. (Disclaimer: that blog is from Databricks, where I work, but it doesn't have any Databricks-specific stuff.)
If you are absolutely sure none of those solutions work for you, you can transform your DataFrame in process_batch
and write it somewhere.
So, your code should end up looking something like:
query = (
input_df
# Apply built-in operators, such as a filter or deduplication
.filter("foo > 10")
# Fill out the parameters if you're deduplicating
.dropDuplicatesWithinWatermark(...)
.writeStream
# process_batch should write to a non-built-in sink
# If the sink is built-in, use .format to specify it
.foreachBatch(process_batch)
.outputMode("append")
.option("checkpointLocation", checkpoint_dir)
.start()
)