I have run the following code via intellij and runs successfully. The code is shown below.
import org.apache.spark.sql.SparkSession
object HudiV1 {
// Scala code
case class Employee(emp_id: Int, employee_name: String, department: String, state: String, salary: Int, age: Int, bonus: Int, ts: Long)
def main(args: Array[String]) {
val spark = SparkSession.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("className", "org.apache.hudi")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config("spark.sql.warehouse.dir", "file:///D:/downloads/hudilake")
.master("local[*]")
.getOrCreate()
import scala.util.Random
object DataGenerator {
def getEmployees(n: Int): List[Employee] = {
val departments = List("IT", "HR", "Sales", "Marketing")
val states = List("CA", "NY", "TX", "FL", "IL", "RJ")
(1 to n).map { i =>
Employee(i, Random.alphanumeric.take(10).mkString, departments(Random.nextInt(departments.size)), states(Random.nextInt(states.size)), Random.nextInt(150000 - 10000) + 10000, Random.nextInt(60 - 18) + 18, Random.nextInt(100000), System.currentTimeMillis())
}.toList
}
}
val employees = DataGenerator.getEmployees(2)
val columns = Seq("emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts")
val df = spark.createDataFrame(employees).toDF(columns: _*)
val db_name = "hudidb"
val table_name = "hudi_table"
val recordkey = "emp_id"
val precombine = "ts"
val path = "file:///D:/downloads/hudilake"
val method = "upsert"
val table_type = "COPY_ON_WRITE"
val hudi_options = Map(
"hoodie.table.name" -> table_name,
"hoodie.datasource.write.recordkey.field" -> "emp_id",
"hoodie.datasource.write.table.name" -> table_name,
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.precombine.field" -> "ts",
"hoodie.upsert.shuffle.parallelism" -> "2",
"hoodie.insert.shuffle.parallelism" -> "2"
)
println("*" * 55)
println("overwrite")
println("*" * 55)
df.write.format("org.apache.hudi")
.options(hudi_options)
.mode("overwrite")
.save(path)
println("*" * 55)
println("READ")
println("*" * 55)
val read_df = spark.read.format("org.apache.hudi")
.load(path)
read_df.show()
}
}
The sbt structure is attached below as well.
name := "HudiV1"
version := "0.1"
scalaVersion := "2.12.12"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.3.1",
"org.apache.spark" %% "spark-sql" % "3.3.1",
"org.apache.spark" %% "spark-streaming" % "3.3.1",
"org.apache.hudi" % "hudi-spark-bundle_2.12" % "0.10.0",
"org.apache.hudi" % "hudi-utilities-bundle_2.12" % "0.10.0",
"org.apache.hudi" % "hudi-common" % "0.10.0",
"org.apache.hadoop" % "hadoop-common" % "3.3.1",
"org.apache.hadoop" % "hadoop-client" % "3.3.1",
"org.apache.avro" % "avro" % "1.10.2",
"org.apache.avro" % "avro-mapred" % "1.10.2" % "test",
"org.apache.avro" % "avro-tools" % "1.10.2" % "test"
)
Now when I try to run this same program after bundling to jar using sbt I get the error Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.sql.adapter.Spark3Adapter
.
The command and full stacktrace is shown below.
D:\Spark\spark-3.3.1-bin-hadoop3\bin\spark-submit --class "HudiV1" --jars hudiv1_2.12-0.1.jar hudi-spark-bundle_2.12-0.10.0.jar
Stacktrace(Skipped some unwanted logs)
23/04/14 23:38:30 INFO SparkContext: Running Spark version 3.3.1
23/04/14 23:38:30 INFO ResourceUtils: ==============================================================
23/04/14 23:38:30 INFO ResourceUtils: No custom resources configured for spark.driver.
23/04/14 23:38:30 INFO ResourceUtils: ==============================================================
23/04/14 23:38:30 INFO SparkContext: Submitted application: HudiV1
.
.
.
Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.sql.adapter.Spark3Adapter
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at org.apache.hudi.SparkAdapterSupport.sparkAdapter(SparkAdapterSupport.scala:35)
at org.apache.hudi.SparkAdapterSupport.sparkAdapter$(SparkAdapterSupport.scala:29)
at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences.sparkAdapter$lzycompute(HoodieAnalysis.scala:119)
at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences.sparkAdapter(HoodieAnalysis.scala:119)
at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences$$anonfun$apply$1.applyOrElse(HoodieAnalysis.scala:283)
at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences$$anonfun$apply$1.applyOrElse(HoodieAnalysis.scala:123)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:30)
at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences.apply(HoodieAnalysis.scala:123)
at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences.apply(HoodieAnalysis.scala:119)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:227)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:223)
at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:172)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:223)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:187)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:208)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.SparkSession.$anonfun$createDataFrame$2(SparkSession.scala:322)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:319)
at HudiV1$.main(HudiV1.scala:34)
at HudiV1.main(HudiV1.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
the standalone version of spark used is spark 3.3.1
. Can someone help me solve this?
sbt package
creates a thin JAR without external libraries
sbt assembly
creates an uber/fat JAR with them bundled in, so you don't need --jars
or --packages
command on spark-submit
.
Here's an sbt plugin that makes this easier for Spark - https://github.com/alonsodomin/sbt-spark
When you run code from IntelliJ, it defaults to include all dependencies on the classpath.