scalaapache-sparkapache-hudi

Apache Spark: Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.sql.adapter.Spark3Adapter


I have run the following code via intellij and runs successfully. The code is shown below.

import org.apache.spark.sql.SparkSession

object HudiV1 {

  // Scala code
  case class Employee(emp_id: Int, employee_name: String, department: String, state: String, salary: Int, age: Int, bonus: Int, ts: Long)

  def main(args: Array[String]) {

    val spark = SparkSession.builder()
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("className", "org.apache.hudi")
      .config("spark.sql.hive.convertMetastoreParquet", "false")
      .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
      .config("spark.sql.warehouse.dir", "file:///D:/downloads/hudilake")
      .master("local[*]")
      .getOrCreate()

    import scala.util.Random


    object DataGenerator {
      def getEmployees(n: Int): List[Employee] = {
        val departments = List("IT", "HR", "Sales", "Marketing")
        val states = List("CA", "NY", "TX", "FL", "IL", "RJ")
        (1 to n).map { i =>
          Employee(i, Random.alphanumeric.take(10).mkString, departments(Random.nextInt(departments.size)), states(Random.nextInt(states.size)), Random.nextInt(150000 - 10000) + 10000, Random.nextInt(60 - 18) + 18, Random.nextInt(100000), System.currentTimeMillis())
        }.toList
      }
    }

    val employees = DataGenerator.getEmployees(2)
    val columns = Seq("emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts")
    val df = spark.createDataFrame(employees).toDF(columns: _*)

    val db_name = "hudidb"
    val table_name = "hudi_table"
    val recordkey = "emp_id"
    val precombine = "ts"
    val path = "file:///D:/downloads/hudilake"
    val method = "upsert"
    val table_type = "COPY_ON_WRITE"

    val hudi_options = Map(
      "hoodie.table.name" -> table_name,
      "hoodie.datasource.write.recordkey.field" -> "emp_id",
      "hoodie.datasource.write.table.name" -> table_name,
      "hoodie.datasource.write.operation" -> "upsert",
      "hoodie.datasource.write.precombine.field" -> "ts",
      "hoodie.upsert.shuffle.parallelism" -> "2",
      "hoodie.insert.shuffle.parallelism" -> "2"
    )


    println("*" * 55)
    println("overwrite")
    println("*" * 55)

    df.write.format("org.apache.hudi")
      .options(hudi_options)
      .mode("overwrite")
      .save(path)

    println("*" * 55)
    println("READ")
    println("*" * 55)

    val read_df = spark.read.format("org.apache.hudi")
      .load(path)

    read_df.show()

  }
}

The sbt structure is attached below as well.


name := "HudiV1"

version := "0.1"

scalaVersion := "2.12.12"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.1",
  "org.apache.spark" %% "spark-sql" % "3.3.1",
  "org.apache.spark" %% "spark-streaming" % "3.3.1",
  "org.apache.hudi" % "hudi-spark-bundle_2.12" % "0.10.0",
  "org.apache.hudi" % "hudi-utilities-bundle_2.12" % "0.10.0",
  "org.apache.hudi" % "hudi-common" % "0.10.0",
  "org.apache.hadoop" % "hadoop-common" % "3.3.1",
  "org.apache.hadoop" % "hadoop-client" % "3.3.1",
  "org.apache.avro" % "avro" % "1.10.2",
  "org.apache.avro" % "avro-mapred" % "1.10.2" % "test",
  "org.apache.avro" % "avro-tools" % "1.10.2" % "test"
)

Now when I try to run this same program after bundling to jar using sbt I get the error Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.sql.adapter.Spark3Adapter.

The command and full stacktrace is shown below.

D:\Spark\spark-3.3.1-bin-hadoop3\bin\spark-submit --class "HudiV1" --jars hudiv1_2.12-0.1.jar hudi-spark-bundle_2.12-0.10.0.jar

Stacktrace(Skipped some unwanted logs)

23/04/14 23:38:30 INFO SparkContext: Running Spark version 3.3.1
23/04/14 23:38:30 INFO ResourceUtils: ==============================================================
23/04/14 23:38:30 INFO ResourceUtils: No custom resources configured for spark.driver.
23/04/14 23:38:30 INFO ResourceUtils: ==============================================================
23/04/14 23:38:30 INFO SparkContext: Submitted application: HudiV1
.
.
.
Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.sql.adapter.Spark3Adapter
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        at org.apache.hudi.SparkAdapterSupport.sparkAdapter(SparkAdapterSupport.scala:35)
        at org.apache.hudi.SparkAdapterSupport.sparkAdapter$(SparkAdapterSupport.scala:29)
        at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences.sparkAdapter$lzycompute(HoodieAnalysis.scala:119)
        at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences.sparkAdapter(HoodieAnalysis.scala:119)
        at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences$$anonfun$apply$1.applyOrElse(HoodieAnalysis.scala:283)
        at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences$$anonfun$apply$1.applyOrElse(HoodieAnalysis.scala:123)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:30)
        at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences.apply(HoodieAnalysis.scala:123)
        at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences.apply(HoodieAnalysis.scala:119)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:227)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:172)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:187)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:208)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
        at org.apache.spark.sql.SparkSession.$anonfun$createDataFrame$2(SparkSession.scala:322)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:319)
        at HudiV1$.main(HudiV1.scala:34)
        at HudiV1.main(HudiV1.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

the standalone version of spark used is spark 3.3.1. Can someone help me solve this?


Solution

  • sbt package creates a thin JAR without external libraries

    sbt assembly creates an uber/fat JAR with them bundled in, so you don't need --jars or --packages command on spark-submit.

    Here's an sbt plugin that makes this easier for Spark - https://github.com/alonsodomin/sbt-spark

    When you run code from IntelliJ, it defaults to include all dependencies on the classpath.