apache-sparkelasticsearchpysparkspark-streamingspark-structured-streaming

pyspark - structured streaming into elastic search


Im working on a code in which i'm trying to stream data into elastic search using structured streaming by pySpark.

Spark version : 3.0.0 Installed Mode : pip

query = inpJoinDF.writeStream \
.outputMode("append") \
.queryName("writing_to_es") \
.format("org.elasticsearch.spark.sql") \
.option("checkpointLocation", "es_checkpoint/") \
.option("es.resource", "spark_test/doc") \
.option("es.nodes", "localhost") \
.start()

Also have tried by adding package & format

pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.7.1
format("es")

Below is the error lines:

line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o57.start. : java.lang.ClassNotFoundException: Failed to find data source: org.elasticsearch.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:674) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:342) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.sql.DefaultSource at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:648) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:648) at scala.util.Failure.orElse(Try.scala:224) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:648) ... 12 more

Thanks 🙏 for helping out!!


Solution

  • Thank you so much, i was using spark 3 which is built on scala 2.12, unfortunately elasticsearch-hadoop jar is supported till 2.11 version of scala. I have downgraded my spark version to 2.4.6 which is built on scala 2.11.