I have two separate Python scripts (job1.py and job2.py) that use Spark Structured Streaming to consume data from the Kafka topic test1. Both scripts are configured with the same Kafka consumer group (consumer-group-1), expecting that only one of them should consume the data at any time. However, when I run both scripts simultaneously and send data to test1 using bin/kafka-console-producer.sh, both jobs seem to process the same data.
job1.py:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
import signal
import sys
checkpoint_dir = "/tmp/checkpoints"
kafka_bootstrap_servers = "localhost:9092"
spark = SparkSession.builder \
.appName("KafkaConsumer1") \
.getOrCreate()
spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "true")
spark.sparkContext.setLogLevel("WARN")
shutdown_requested = False
def shutdown_handler(signum, frame):
global shutdown_requested
print("Graceful shutdown initiated...")
shutdown_requested = True
query.stop()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", "test1") \
.option("startingOffsets", "latest") \
.option("kafka.group.id", "consumer-group-1") \
.load()
df = df.selectExpr("CAST(value AS STRING) as message")
df = df.withColumn("char_count", length(col("message")))
query = df.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", f"{checkpoint_dir}/wordcount_dta") \
.start()
try:
query.awaitTermination()
except Exception as e:
print(f"Exception encountered: {e}")
job2.py:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
import signal
import sys
checkpoint_dir = "/tmp/checkpoints"
kafka_bootstrap_servers = "localhost:9092"
spark = SparkSession.builder \
.appName("KafkaConsumer2") \
.getOrCreate()
spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "true")
spark.sparkContext.setLogLevel("WARN")
shutdown_requested = False
def shutdown_handler(signum, frame):
global shutdown_requested
print("Graceful shutdown initiated...")
shutdown_requested = True
query.stop()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", "test1") \
.option("startingOffsets", "latest") \
.option("kafka.group.id", "consumer-group-1") \
.load()
df = df.selectExpr("CAST(value AS STRING) as message")
df = df.withColumn("char_count", length(col("message")))
query = df.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", f"{checkpoint_dir}/wordcount") \
.start()
try:
query.awaitTermination()
except Exception as e:
print(f"Exception encountered: {e}")
Steps to Reproduce:
Start both job1.py and job2.py simultaneously. Send messages to Kafka topic test1 using bin/kafka-console-producer.sh.
Expected Behavior: Only one Spark job (either job1.py or job2.py) should consume the messages from the Kafka topic due to using the same consumer group (consumer-group-1).
Actual Behavior: Both Spark jobs are processing the same messages concurrently, which is not the expected behavior with a shared consumer group.
Why are both Spark Structured Streaming jobs processing the same data from Kafka, even though they use the same consumer group? How can I ensure that only one job processes messages from the Kafka topic at any time, allowing seamless updates where I can start new jobs with updated queries and stop old ones without interruption or duplication?
Kafka consumer group distribute the events from topic by partitions to the consumers.In your job you can assign different partitions of the topic to each job. Example if your kafka topic has 4 partition then assign 2 partitions to each job
Job1: // assigning 0,1 partitions to job1
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("assign", "{'test1':[0,1]'}") \
.option("startingOffsets", "latest") \
.option("kafka.group.id", "consumer-group-1") \
.load()
Job2: // assigning 2,3 partitions to job2
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("assign", "{'test1':[2,3]'}")
.option("startingOffsets", "latest") \
.option("kafka.group.id", "consumer-group-1") \
.load()
Check this out if you where able to distribute the events across the multiple job2
Note: If you have only 1 partition you need increase the parallelism to create more partitions in your kafka, also to send the data to kafka use the key also which will distribute the data across the kafka partitions.