javascalaapache-sparkapache-spark-sqludf

UDF using Java methods breaks on spark


I have done this code on databricks environment but when I try it on my local env it breaks...

  val _event_day_of_week = (event_date_of_event: String) => {
    import java.time.LocalDate
    import java.time.format.DateTimeFormatter

    val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
    val dayOfWeek: String = LocalDate.parse(event_date_of_event.substring(0,10), formatter).getDayOfWeek.toString
    dayOfWeek
  }

  val event_day_of_weekUDF = udf(_event_day_of_week)

df.select($"uuid", event_day_of_weekUDF($"event_date_of_event") as "event_day_of_week").first

Error:

Exception in thread "main" java.lang.NullPointerException
    at com.faniak.ml.eventBuzz$.delayedEndpoint$com$faniak$ml$eventBuzz$1(eventBuzz.scala:72)
    at com.faniak.ml.eventBuzz$delayedInit$body.apply(eventBuzz.scala:17)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.faniak.ml.eventBuzz$.main(eventBuzz.scala:17)
    at com.faniak.ml.eventBuzz.main(eventBuzz.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

version is Spark 2.1


Solution

  • The problem had nothing to do with the UDFs. When prototyping on Apache Spark, do not extend the Scala class App because it does not work properly with Spark.

    object EventBuzzDataset extends App {
    

    In order to make it work you should write:

    object EventBuzzDataset{
    
       def main(args: Array[String])
    

    The problem is well detailed here: https://issues.apache.org/jira/browse/SPARK-4170 and https://github.com/apache/spark/pull/3497

    Thanks to @puhlen for the hint!