dockerpysparkapache-kafkajupyter-notebookspark-streaming

pyspark streaming write to kafka doesnt work


i want to send this simple pyspark dataframe to kafka and i done everything but i always get an error. i try it with simple python producer script and it works and the pyspark read stream work my only problem is with the write to kafka using pyspark. can anyone help me please.

from pyspark.sql import SparkSession
import os 

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

# Create a SparkSession
spark = SparkSession.builder \
    .appName("KafkaSinkExample") \
    .getOrCreate()

# Sample data
data = [("key1", "value1"), ("key2", "value2"), ("key3", "value3")]
df = spark.createDataFrame(data, ["key", "value"])

# Write data to Kafka
(df.write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:29092") \
    .option("topic", "test") \
    .save())

This is my docker-compose file:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  pyspark:
    image: jupyter/pyspark-notebook:latest
    container_name: pyspark
    ports:
      - "8888:8888"
    environment:
      - PYSPARK_PYTHON=python3
      - PYSPARK_DRIVER_PYTHON=python3
      - SPARK_HOME=/usr/local/spark
    volumes:
      - ./notebooks:/home/jovyan/work # Mount your notebooks folder or adjust as needed


the Error that i get:

23/12/25 16:50:56 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):                                                                                                                                                                           (0 + 1) / 2]
  File "/home/jovyan/preprocessing/bing.py", line 23, in <module>
    .save())
     ^^^^^^
  File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 1461, in save
    self._jwrite.save()
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o49.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1) (0f08baeba383 executor driver): java.lang.NoSuchMethodError: 'boolean org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$4()'

Solution

  • i had the same problem if you use the latest pyspark version 3.5.0 you should edit your package to

    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'