Using PySpark, implemented spark streaming + kafka integration. Every run it gives offset from 0.
Need to solve 2 issues:
Help in solving this
# create spark session
spark = SparkSession.builder \
.appName(appName) \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
.getOrCreate()
# Define schema for data in value field
schema = StructType([
StructField("col1", StringType()),
StructField("col2", StringType()),
StructField("col3" , TimestampType()),
StructField("col4" , DoubleType())
])
# Spark streaming
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", broker) \
.option("subscribe", topic) \
.option("kafka.group.id", appName) \
.option("enable.auto.commit", True) \
.load()
value_df = df.select(col("topic"), col("partition"), col("offset"), from_json(col("value").cast("STRING"), schema).alias("values"))
See Structured Streaming Programming Guide and Kafka Structured Streaming docs.
spark.read()
is batch processing. spark.readStream()
is Structured streaming.
- Reading stream for last 15 mins
- Read from last committed offset for each partition
One basic thing to understand is that (just like Kafka) both of these things (checkpoint and frequency) are specified per consumer. So you specify this when you writeStream
, not when you readStream
.
Think of a "streaming Dataframe
returned by spark.readStream()
" as a Kafka topic. Which in turn can have multiple consumers and each consumer can define their own checkpoint and frequency etc.
Similarly you can have multiple consumers for a single streaming Dataframe, and specify different checkpoint and frequency for each. E.g.
# Create streaming dataframe
sdf = spark.readStream.format('kafka').option(...)
# start() DataStreamWriter to get create streaming queries
sq1 = sdf.writeStream.parquet(...) \
.trigger(processingTime='1 seconds').option('checkpointLocation', 'loc1').start()
sq2 = sdf.writeStream.csv(...) \
.trigger(processingTime='5 seconds').option('checkpointLocation', 'loc2').start()
spark.streams.awaitTermination()
Reading stream for last 15 mins
This is controlled via DataStreamWriter.trigger()
This can be achieved in 2 ways:
.trigger(availableNow=True)
. So your compute can start every 15 mins, run the job and then shutdown. A databricks cluster e.g. takes 4-6 minutes to start..trigger(processingTime='15 minutes')
. If you want lower latencies and/or have a shared cluster continuously running, then you can do this, otherwise makes no sense to do this for a 15 min frequency.Read from last committed offset for each partition
This is controlled via DataStreamWriter.option('checkpointLocation', '/path/to/checkpoint/folder/')
. E.g. on S3, local-disk, ADLS, ...