apache-sparkpysparkapache-kafkaapache-spark-sqloffset

Spark streaming + kafka integration, read data from kafka for every 15 minutes and store last read offset using PySpark


Using PySpark, implemented spark streaming + kafka integration. Every run it gives offset from 0.

Need to solve 2 issues:

  1. Reading stream for last 15 mins
  2. Read from last committed offset for each partition

Help in solving this

# create spark session 
spark = SparkSession.builder \
    .appName(appName) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
    .getOrCreate()

# Define schema for data in value field
schema =  StructType([
            StructField("col1", StringType()),
            StructField("col2", StringType()),
            StructField("col3" , TimestampType()), 
            StructField("col4" , DoubleType())
            ])

# Spark streaming 
df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", broker) \
    .option("subscribe", topic) \
    .option("kafka.group.id", appName) \
    .option("enable.auto.commit", True) \
    .load()


value_df = df.select(col("topic"), col("partition"), col("offset"), from_json(col("value").cast("STRING"), schema).alias("values"))

Solution

  • See Structured Streaming Programming Guide and Kafka Structured Streaming docs.

    spark.read() is batch processing. spark.readStream() is Structured streaming.


    1. Reading stream for last 15 mins
    2. Read from last committed offset for each partition

    One basic thing to understand is that (just like Kafka) both of these things (checkpoint and frequency) are specified per consumer. So you specify this when you writeStream, not when you readStream.

    Think of a "streaming Dataframe returned by spark.readStream()" as a Kafka topic. Which in turn can have multiple consumers and each consumer can define their own checkpoint and frequency etc.

    Similarly you can have multiple consumers for a single streaming Dataframe, and specify different checkpoint and frequency for each. E.g.

    # Create streaming dataframe
    sdf = spark.readStream.format('kafka').option(...)
    
    # start() DataStreamWriter to get create streaming queries
    sq1 = sdf.writeStream.parquet(...) \
      .trigger(processingTime='1 seconds').option('checkpointLocation', 'loc1').start()
    sq2 = sdf.writeStream.csv(...) \
      .trigger(processingTime='5 seconds').option('checkpointLocation', 'loc2').start()
    
    spark.streams.awaitTermination()
    

    Reading stream for last 15 mins

    This is controlled via DataStreamWriter.trigger()

    This can be achieved in 2 ways:

    1. Run a job every X minutes, it'll read all new messages since the last checkpoint. In your case X=15, but code would be generic, i.e. you can run same code at any frequency without changing the code. Key here is .trigger(availableNow=True). So your compute can start every 15 mins, run the job and then shutdown. A databricks cluster e.g. takes 4-6 minutes to start.
    2. Have the job running continuously. And it looks for new messages every 15 minutes. So your compute is running all the time. For this you'll use .trigger(processingTime='15 minutes'). If you want lower latencies and/or have a shared cluster continuously running, then you can do this, otherwise makes no sense to do this for a 15 min frequency.

    Read from last committed offset for each partition

    This is controlled via DataStreamWriter.option('checkpointLocation', '/path/to/checkpoint/folder/'). E.g. on S3, local-disk, ADLS, ...