dockerapache-sparkpysparkapache-kafkaspark-streaming-kafka

Docker pypspark cluster container not receiving kafka streaming from the host?


I have created and deployed a spark cluster which consist of 4 container running

  1. spark master

  2. spark-worker

  3. spark-submit

  4. data-mount-container : to access the script from the local directory

i added required dependency jar in all these container

And also deployed the kafka in the host machine where it produce streaming via producer.

i launched the kafka as per the exact step in the below document

https://kafka.apache.org/quickstart

i verified kafka producer and consumer to exchange the message on 9092 port, which is working fine

Below is the simple pyspark script which i want to process as structured streaming

from pyspark import SparkContext
from pyspark.sql import SparkSession

print("Kafka App launched")
spark = SparkSession.builder.master("spark://master:7077").appName("kafka_Structured").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hostmachine:9092").option("subscribe", "session-event").option("maxOffsetsPerTrigger", 10).load()

converted_string=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

print("Recieved Stream in String", converted_string)

and below is the spark-submit i used to execute the script

##container
# pyspark_vol - container for vol mounting
# spark/stru_kafka - container for spark-submit
# i linked the spark master and worker already using the container 'master'

##spark submit
docker run --add-host="localhost: myhost" --rm -it --link master:master --volumes-from pyspark_vol spark/stru_kafka spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 –jars /home/spark/spark-2.1.1-bin-hadoop2.6/jars/spark-sql-kafka-0-10_2.11-2.1.1.jar --master spark://master:7077  /data/spark_session_kafka.py localhost 9092 session-event

After i ran the script, the script is executing fine, but it not seems to be listening to the streaming as a batch from the kafka producer and stopping the execution.

i didn't observed any specific error, but not producing any out put from the script

I verified the connectivity in receiving data from the host inside the docker container using socket program, which is working fine.

i am not sure if i have missed any configuration ..

Expected:

The above application which is running on spark-cluster should print the streaming coming from kafka producer

Actual

  "id" : "f4e8829f-583e-4630-ac22-1d7da2eb80e7",
  "runId" : "4b93d523-7b7c-43ad-9ef6-272dd8a16e0a",
  "name" : null,
  "timestamp" : "2020-09-09T09:21:17.931Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 1922,
    "getBatch" : 287,
    "getOffset" : 361,
    "queryPlanning" : 111,
    "triggerExecution" : 2766,
    "walCommit" : 65
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[session-event]]",
    "startOffset" : null,
    "endOffset" : {
      "session-event" : {
        "0" : 24
      }
    },
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6a1b0b4b"
  }
}



Solution

  • The issue was with my pyspark_stream script where i missed to provide batch processing time and print statement to view the logs...

    since its not a aggregated streaming, i had to use 'append' here

    
    
    result =df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    
    print("Kafka Straming output is",result)
    query = result.writeStream.outputMode("append").format("console").trigger(processingTime='30 seconds').start()