I'm using Apache Spark Structured Streaming to read data from Kafka topic, and do some processing. I'm using watermark to account for late-coming records and the code works fine.
Here is the working(sample) code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp, window, max,expr
from pyspark.sql.types import StructType, StructField, StringType, DoubleType,IntegerType
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("Sliding Window Demo") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", 1) \
.getOrCreate()
stock_schema = StructType([
StructField("LogType", StringType()),
StructField("CreatedTime", StringType()),
StructField("Type", StringType()),
StructField("Amount", IntegerType()),
StructField("BrokerCode", StringType())
])
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "trades") \
.option("startingOffsets", "earliest") \
.load()
value_df = kafka_df.select(from_json(col("value").cast("string"), stock_schema).alias("value"))
trade_df = value_df.select("value.*") \
.withColumn("CreatedTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss")) \
.withColumn("Buy", expr("case when Type == 'BUY' then Amount else 0 end")) \
.withColumn("Sell", expr("case when Type == 'SELL' then Amount else 0 end"))
window_agg_df = trade_df \
.withWatermark("CreatedTime", "10 minute") \
.groupBy(window(col("CreatedTime"), "10 minute")) \
.agg({"Buy":"sum", "Sell":"sum"}).withColumnRenamed("sum(Buy)", "TotalBuy").withColumnRenamed("sum(Sell)", "TotalSell")
output_df = window_agg_df.select("window.start", "window.end", "TotalBuy", "TotalSell")
window_query = output_df.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation", "chk-point-dir-mar28") \
.trigger(processingTime="30 second") \
.start()
window_query.awaitTermination()
Currently, I'm processing a single LogType, the requirement is to process multiple LogTypes in the same flow .. LogTypes will be config driven, and can change based on client. Objective is to have generic code that can process all logTypes (config-driven)
As an example, for LogType X, I will need to get groupby columns col1, col2 and get the sum of values 'sent' & 'received'. for LogType Y, the groupBy columns will remain the same but the sum will be on column col3 instead.
w/o the watermark, I can look at the LogType and do the processing in batch mode (using foreachbatch). However, with watermark - i'm unable to figure out how to process based on LogType.
Any inputs on this ?
tia!
You add filter function to compare rows of a dataset
value_df = kafka_df.select(from_json(col("value").cast("string"), stock_schema).alias("value"))
filtered = value_df.filter(value_df.LogType == "X")
Regarding "config driven", import sys
or argparse
modules to accept "log-type" flag when you run the script, and run multiple times for each type
spark-submit ... app.py --log-type=X
spark-submit ... app.py --log-type=Y
If you need to perform different logic based on the type, use a regular if-else statement