That is my code.
import findspark
findspark.init()
import os
os.environ[
"PYSPARK_SUBMIT_ARGS"
] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 pyspark-shell"
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
time_format = "dd/MM/yyyy HH:mm:ss"
spark = (
SparkSession.builder.appName("Spark Kafka Streaming").master(master).getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
schema = StructType(
[
StructField("id", IntegerType(), True),
StructField("tarih", StringType(), True),
StructField("time", TimestampType(), True),
StructField("temperature", FloatType(), True),
StructField("pressure", FloatType(), True),
StructField("vibration_x", FloatType(), True),
StructField("vibration_y", FloatType(), True),
StructField("vibration_motor", FloatType(), True),
]
)
lines = (
spark.readStream.option("multiLine", True)
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "raw-data")
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
.select(col("parsed_value.*"))
)
df = lines.select("*")
df2 = (
df.withColumn("org_unit4", split(col("tarih"), ",").getItem(0))
.withColumn("asset", split(col("tarih"), ",").getItem(1))
.drop("tarih")
.withColumn("time", date_format(col("time"), time_format))
.select(
"id",
"org_unit4",
"asset",
"time",
"temperature",
"pressure",
"vibration_x",
"vibration_y",
"vibration_motor",
)
)
query = (
df2.writeStream.option("truncate", False)
.outputMode("Append")
.format("console")
.start()
)
query.awaitTermination()
root@115ec4500b0e:/usr/spark-2.3.1/sparks/deneme# python deneme3.py
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/spark-2.3.1/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b06fc574-6246-4c40-8953-72f8b41bcbaf;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 in central
found org.apache.kafka#kafka-clients;0.10.0.1 in central
found net.jpountz.lz4#lz4;1.3.0 in central
found org.xerial.snappy#snappy-java;1.1.2.6 in central
found org.slf4j#slf4j-api;1.7.16 in central
found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 312ms :: artifacts dl 9ms
:: modules in use:
net.jpountz.lz4#lz4;1.3.0 from central in [default]
org.apache.kafka#kafka-clients;0.10.0.1 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 from central in [default]
org.slf4j#slf4j-api;1.7.16 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 6 | 0 | 0 | 0 || 6 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-b06fc574-6246-4c40-8953-72f8b41bcbaf
confs: [default]
0 artifacts copied, 6 already retrieved (0kB/11ms)
2022-10-02 12:24:22 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
File "deneme3.py", line 42, in <module>
.option("startingOffsets", "earliest")
File "/usr/spark-2.3.1/python/pyspark/sql/streaming.py", line 403, in load
return self._df(self._jreader.load())
File "/usr/spark-2.3.1/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/spark-2.3.1/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/spark-2.3.1/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o40.load.
: org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
The code works when I add spark_home on my laptop, but I need to extract it to docker environment.
This is docker images --> https://hub.docker.com/layers/gettyimages/spark/2.3.1-hadoop-3.0/images/sha256-0bc08017eb4da02b7d6260ca3e5fdff921944c4a598283f1787521c58cf368c6?context=explore
Note (the spark version i am using is 2.3.1
Even if I install the environment from a different place, it should be version 2.3.1.)
You can pass this, but I would also suggest upgrading Spark version and Kafka dependency because this may be a bug, since RangeAssignor should be the default.
.option("kafka.partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RangeAssignor")
You could also try including newer version of kafka-clients
in PYSPARK_SUBMIT_ARGS