apache-sparkelasticsearchpyspark

Pyspark Streaming data to Elastic search index from Kafka topic , running in Jupyter notebook, causing failure


I am streaming data from Kafka Topic, via Pyspark in Jupyter Notebook. We are writestreaming to Elasticsearch index.

It is giving this error, it is working fine , when loading to HDFS.

Driver stacktrace:)
24/09/14 04:17:08 ERROR Executor: Exception in task 39.0 in stage 15.0 (TID 193)
java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.encoders.ExpressionEncoder org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(org.apache.spark.sql.types.StructType)'
        at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.<init>(EsStreamQueryWriter.scala:50)
        at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.$anonfun$addBatch$5(EsSparkSqlStreamingSink.scala:72)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

Spark Version : 3.5.0 Elasticsearch : elastic.co/elasticsearch/elasticsearch:8.9.0

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.9.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - ELASTIC_PASSWORD=password#123
      - xpack.security.enabled=false
      - xpack.security.transport.ssl.enabled=false
    ports:
      - '9200:9200'
      - '9300:9300'

  kibana:
    image: docker.elastic.co/kibana/kibana:8.9.0
    container_name: kibana
    ports:
      - '5601:5601'
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
      - xpack.security.enabled=false
    depends_on:
      - elasticsearch

Pyspark code

# Create Spark Session
spark = SparkSession.builder \
    .appName("KafkaToHDFS") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.elasticsearch:elasticsearch-spark-30_2.12:8.9.0") \
    .config("es.nodes", "192.XXX.XX.144") \
    .config("es.port", "9200") \
    .config("es.nodes.wan.only", "true") \
    .config("spark.sql.adaptive.enabled", "false") \
    .getOrCreate()


es_query = person_df.writeStream \
    .format("es") \
    .queryName("writing_to_es") \
    .option("es.nodes", "192.XXX.XX.144:9200") \
    .option("es.resource", "uc_person_plot/_doc") \
    .option("checkpointLocation", "hdfs://namenode:9000/uc/es/checkpoint_dir") \
    .outputMode("append") \
    .start()

Please do help with your solutions.


Solution

  • In Jupyter Notebook, it was Spark 3.5.0. This version of spark was causing the issue. I started working on Spark 3.2.0, it worked in ease.