pythonpysparkspark-structured-streaming

save a Dataframe in pyspark streaming


I'm looking to store a PySpark DataFrame in streaming, make changes to it for each batch, and then save the updated DataFrame again using foreachBatch. What's the simplest way to accomplish this.

i am using checkpoint for the stream.

My Code Structure:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.streaming import StreamingQuery

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

checkpoint_dir = "/path/to/checkpoint"

input_df = spark.readStream.format("your_input_source").load()

def process_batch(df, batch_id):
   pass


query = (
    input_df.writeStream
    .foreachBatch(process_batch)
    .outputMode("append")
    .option("checkpointLocation", checkpoint_dir)
    .start()
)

query.awaitTermination()


Solution

  • The foreachBatch operator is really a type of sink that allows you to write the resulting DataFrame at the end of each micro-batch to a place of your choosing (that isn't natively supported with any built-in connectors). For example, this could be an OLTP database that doesn't have a Spark connector yet, or you might need to write to multiple sinks. Those should really be the only reasons to use foreachBatch.

    You should not try to do data processing in foreachBatch if you can use streaming operators instead, such as select/filter, deduplication, windowed aggregation, or arbitrary stateful operations in Python. (Disclaimer: that blog is from Databricks, where I work, but it doesn't have any Databricks-specific stuff.)

    If you are absolutely sure none of those solutions work for you, you can transform your DataFrame in process_batch and write it somewhere.

    So, your code should end up looking something like:

    query = (
        input_df
        # Apply built-in operators, such as a filter or deduplication
        .filter("foo > 10")
        # Fill out the parameters if you're deduplicating
        .dropDuplicatesWithinWatermark(...)
        .writeStream
        # process_batch should write to a non-built-in sink
        # If the sink is built-in, use .format to specify it
        .foreachBatch(process_batch)
        .outputMode("append")
        .option("checkpointLocation", checkpoint_dir)
        .start()
    )