Kafka with spark-streaming throws an error:
from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka
I have already setup a kafka broker and a working spark environment with one master and one worker.
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7')
import pyspark
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__=="__main__":
sc = SparkContext(appName="SparkStreamAISfromKAFKA")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc,1)
kvs = KafkaUtils.createStream(ssc,"my-kafka-broker","raw-event-streaming-consumer",{'enriched_ais_messages':1})
lines = kvs.map(lambda x: x[1])
lines.count().map(lambda x: 'Messages AIS: %s' % x).pprint()
ssc.start()
ssc.awaitTermination()
I assume for the error that something is missing related to kafka ans specifically with the versions. Can anyone help with this?
spark-version: version 3.0.0-preview2
I execute with:
/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 --jars spark-streaming-kafka-0-10_2.11 spark_streamer.py spark://mysparkip:7077
According to the Spark Streaming + Kafka Integration Guide:
"Kafka 0.8 support is deprecated as of Spark 2.3.0."
In addition, the screenshot below shows that Python is no supported for Kafka 0.10 (and higher).
In your case you will have to use Spark 2.4 in order to get your code running.
If you plan to use the latest version of Spark (e.g. 3.x) and still want to integrate Spark with Kafka in Python you can use Structured Streaming. You will find detailed instructions on how to use the Python API in the Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher):
# Subscribe to 1 topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()