dockerapache-sparkpysparkapache-kafkaspark-kafka-integration

Spark Structured Streaming - Kafka - Missing required configuration "partition.assignment.strategy


That is my code.

import findspark

findspark.init()
import os

os.environ[
    "PYSPARK_SUBMIT_ARGS"
] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 pyspark-shell"
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *


appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
time_format = "dd/MM/yyyy HH:mm:ss"

spark = (
    SparkSession.builder.appName("Spark Kafka Streaming").master(master).getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")

schema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("tarih", StringType(), True),
        StructField("time", TimestampType(), True),
        StructField("temperature", FloatType(), True),
        StructField("pressure", FloatType(), True),
        StructField("vibration_x", FloatType(), True),
        StructField("vibration_y", FloatType(), True),
        StructField("vibration_motor", FloatType(), True),
    ]
)

lines = (
    spark.readStream.option("multiLine", True)
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "raw-data")
    .option("startingOffsets", "earliest")
    .load()
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
    .select(col("parsed_value.*"))
)
df = lines.select("*")

df2 = (
    df.withColumn("org_unit4", split(col("tarih"), ",").getItem(0))
    .withColumn("asset", split(col("tarih"), ",").getItem(1))
    .drop("tarih")
    .withColumn("time", date_format(col("time"), time_format))
    .select(
        "id",
        "org_unit4",
        "asset",
        "time",
        "temperature",
        "pressure",
        "vibration_x",
        "vibration_y",
        "vibration_motor",
    )
)

query = (
    df2.writeStream.option("truncate", False)
    .outputMode("Append")
    .format("console")
    .start()
)

query.awaitTermination()
root@115ec4500b0e:/usr/spark-2.3.1/sparks/deneme# python deneme3.py
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/spark-2.3.1/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b06fc574-6246-4c40-8953-72f8b41bcbaf;1.0
        confs: [default]
        found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 in central
        found org.apache.kafka#kafka-clients;0.10.0.1 in central
        found net.jpountz.lz4#lz4;1.3.0 in central
        found org.xerial.snappy#snappy-java;1.1.2.6 in central
        found org.slf4j#slf4j-api;1.7.16 in central
        found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 312ms :: artifacts dl 9ms
        :: modules in use:
        net.jpountz.lz4#lz4;1.3.0 from central in [default]
        org.apache.kafka#kafka-clients;0.10.0.1 from central in [default]
        org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 from central in [default]
        org.slf4j#slf4j-api;1.7.16 from central in [default]
        org.spark-project.spark#unused;1.0.0 from central in [default]
        org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   6   |   0   |   0   |   0   ||   6   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-b06fc574-6246-4c40-8953-72f8b41bcbaf
        confs: [default]
        0 artifacts copied, 6 already retrieved (0kB/11ms)
2022-10-02 12:24:22 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "deneme3.py", line 42, in <module>
    .option("startingOffsets", "earliest")
  File "/usr/spark-2.3.1/python/pyspark/sql/streaming.py", line 403, in load
    return self._df(self._jreader.load())
  File "/usr/spark-2.3.1/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/spark-2.3.1/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/spark-2.3.1/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o40.load.
: org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value.
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)
        at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

The code works when I add spark_home on my laptop, but I need to extract it to docker environment.

This is docker images --> https://hub.docker.com/layers/gettyimages/spark/2.3.1-hadoop-3.0/images/sha256-0bc08017eb4da02b7d6260ca3e5fdff921944c4a598283f1787521c58cf368c6?context=explore Note (the spark version i am using is 2.3.1
Even if I install the environment from a different place, it should be version 2.3.1.)


Solution

  • You can pass this, but I would also suggest upgrading Spark version and Kafka dependency because this may be a bug, since RangeAssignor should be the default.

    .option("kafka.partition.assignment.strategy", 
        "org.apache.kafka.clients.consumer.RangeAssignor")
    

    You could also try including newer version of kafka-clients in PYSPARK_SUBMIT_ARGS