I am just trying to use Rate with Structured Streaming, so as to write to multiple tables names per MicroBatch. I.e. just refreshing multiple sinks logic in prep for some certification, in pyspark.
No errors, but there is no persistence occuring. A while since I looked; must be something basic.
Coding as follows on Databricks Community Edition, no Hive Catalog. Basic stuff.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit
spark = SparkSession.builder \
.appName("SimulateKAFKAandMultipleSinks") \
.getOrCreate()
rate_stream = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 1) \
.load()
message_stream = rate_stream.select(
(rate_stream["value"] + 1000).alias("message_id"),
rate_stream["timestamp"].alias("event_time"),
(concat(lit("T"), rate_stream["value"] % 5)).alias("table_id")
)
def append_to_parquet(df, table_id):
table_path = f"/mnt/parquet/{table_id}"
df.write \
.format("parquet") \
.mode("append") \
.option("path", table_path) \
.save()
def process_batch(df, batch_id):
partitioned_df = df.repartition("table_id")
def process_partition(iterator):
for partition in iterator:
first_row_value = df.first()
table_id_value = first_row_value['table_id']
print(f"Writing partition for table_id: {table_id_value}")
partition_df = partition.filter(col("table_id") == table_id_value)
append_to_parquet(partition_df, table_id_value)
partitioned_df.rdd.mapPartitions(process_partition)
query = message_stream.writeStream \
.foreachBatch(process_batch) \
.outputMode("append") \
.option("checkpointLocation", "/mnt/parquet/checkpoints/") \
.start()
query.awaitTermination()
Update. Oversight my side but educational enough. Can do this with batch KAFKA, not via structured streaming.
The issue with your code not persisting data likely stems from a couple of fundamental issues with how you're using Structured Streaming in Spark.
try this one
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit
# Create Spark session
spark = SparkSession.builder \
.appName("SimulateKAFKAandMultipleSinks") \
.getOrCreate()
# Create a streaming DataFrame
rate_stream = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 1) \
.load()
# Transform the data
message_stream = rate_stream.select(
(rate_stream["value"] + 1000).alias("message_id"),
rate_stream["timestamp"].alias("event_time"),
concat(lit("T"), (rate_stream["value"] % 5).cast("string")).alias("table_id")
)
# Write to multiple sinks based on table_id
def write_to_multiple_sinks(batch_df, batch_id):
unique_table_ids = batch_df.select("table_id").distinct().collect()
for row in unique_table_ids:
table_id = row["table_id"]
table_path = f"/mnt/parquet/{table_id}" # Replace with actual storage path
table_df = batch_df.filter(col("table_id") == table_id)
table_df.write.mode("append").parquet(table_path)
query = message_stream.writeStream \
.foreachBatch(write_to_multiple_sinks) \
.outputMode("append") \
.option("checkpointLocation", "/mnt/parquet/checkpoints/") \
.start()
query.awaitTermination()