apache-sparkspark-streamingspark-avro

Spark streaming, java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/functions$


i am trying to read data from kafka stream, that uses avro serialization for value. I have no problem reading data, and deserializing key that is string, but when i try to deserialise data using from_avro function i get exception

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/functions$
    at DataFrameExample$.main(DataFrameExample.scala:41)
    at DataFrameExample.main(DataFrameExample.scala)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
    at java.base/java.lang.reflect.Method.invoke(Method.java:578)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.functions$
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
    ... 12 more

Sbt file:

name := "test"
organization := "com.databricks"
version := "1"
scalaVersion := "2.12.17"
// Spark Information
val sparkVersion = "3.3.0"
// allows us to include spark packages
resolvers += "bintray-spark-packages" at
  "https://dl.bintray.com/spark-packages/maven/"
resolvers += "Typesafe Simple Repository" at
  "https://repo.typesafe.com/typesafe/simple/maven-releases/"
resolvers += "MavenRepository" at
  "https://mvnrepository.com/"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-avro" % sparkVersion % Provided
)

Code:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro.functions._
import java.nio.file.{Files, Paths}

object DataFrameExample extends Serializable {
  def main(args: Array[String]) = {
    val spark = SparkSession
      .builder()
      .appName("Spark Example")
      .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
      .getOrCreate()

    import spark.implicits._

    spark.sparkContext.setLogLevel("WARN");

    val currentDirectory = new java.io.File(".").getCanonicalPath
    println(currentDirectory);

    val df = spark.readStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "localhost:9091,localhost:9093,localhost:9094"
      )
      .option("startingOffsets", "latest")
      .option("subscribe", "test-person-activity-partitions-replication-qwe")
      .load()

    val jsonFormatSchema = new String(
      Files.readAllBytes(
        Paths.get("./src/main/resources/avro/person-activity.avsc")
      )
    )

    val df2 = df.select(
      df.col("key").cast("string"),
      from_avro($"value", jsonFormatSchema).as("value")
    )

    df2.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()
  }
}

when running application using spark submit i also provide spark_sql_kafka_0-10, which i cannot provide from sbt since i get some other errors, this is not directly related to this problem, but if someone knows what could be the reason for this feel free to answer to that also.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 --class DataFrameExample --master local target/scala-2.12/test_2.12-1.jar

Sorry if this is duplicate but i have looked at every answer o so and other places when searching for similar errors. Tnx


Solution

  • As documented, add the package on command line. Ensure version values are correct.

    spark-submit --packages "org.apache.spark:spark-avro_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0" \
                 --class DataFrameExample \
                 --master local target/scala-2.12/test_2.12-1.jar