I am streaming data from Kafka Topic, via Pyspark in Jupyter Notebook. We are writestreaming to Elasticsearch index.
It is giving this error, it is working fine , when loading to HDFS.
Driver stacktrace:)
24/09/14 04:17:08 ERROR Executor: Exception in task 39.0 in stage 15.0 (TID 193)
java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.encoders.ExpressionEncoder org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(org.apache.spark.sql.types.StructType)'
at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.<init>(EsStreamQueryWriter.scala:50)
at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.$anonfun$addBatch$5(EsSparkSqlStreamingSink.scala:72)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Spark Version : 3.5.0 Elasticsearch : elastic.co/elasticsearch/elasticsearch:8.9.0
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.9.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- ELASTIC_PASSWORD=password#123
- xpack.security.enabled=false
- xpack.security.transport.ssl.enabled=false
ports:
- '9200:9200'
- '9300:9300'
kibana:
image: docker.elastic.co/kibana/kibana:8.9.0
container_name: kibana
ports:
- '5601:5601'
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- xpack.security.enabled=false
depends_on:
- elasticsearch
Pyspark code
# Create Spark Session
spark = SparkSession.builder \
.appName("KafkaToHDFS") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.elasticsearch:elasticsearch-spark-30_2.12:8.9.0") \
.config("es.nodes", "192.XXX.XX.144") \
.config("es.port", "9200") \
.config("es.nodes.wan.only", "true") \
.config("spark.sql.adaptive.enabled", "false") \
.getOrCreate()
es_query = person_df.writeStream \
.format("es") \
.queryName("writing_to_es") \
.option("es.nodes", "192.XXX.XX.144:9200") \
.option("es.resource", "uc_person_plot/_doc") \
.option("checkpointLocation", "hdfs://namenode:9000/uc/es/checkpoint_dir") \
.outputMode("append") \
.start()
Please do help with your solutions.
In Jupyter Notebook, it was Spark 3.5.0. This version of spark was causing the issue. I started working on Spark 3.2.0, it worked in ease.