scalaapache-sparkapache-spark-sqluser-defined-functionsnullable

SparkSQL: How to deal with null values in user defined function?


Given Table 1 with one column "x" of type String. I want to create Table 2 with a column "y" that is an integer representation of the date strings given in "x".

Essential is to keep null values in column "y".

Table 1 (Dataframe df1):

+----------+
|         x|
+----------+
|2015-09-12|
|2015-09-13|
|      null|
|      null|
+----------+
root
 |-- x: string (nullable = true)

Table 2 (Dataframe df2):

+----------+--------+                                                                  
|         x|       y|
+----------+--------+
|      null|    null|
|      null|    null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
 |-- x: string (nullable = true)
 |-- y: integer (nullable = true)

While the user-defined function (udf) to convert values from column "x" into those of column "y" is:

val extractDateAsInt = udf[Int, String] (
  (d:String) => d.substring(0, 10)
      .filterNot( "-".toSet)
      .toInt )

and works, dealing with null values is not possible.

Even though, I can do something like

val extractDateAsIntWithNull = udf[Int, String] (
  (d:String) => 
    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
    else 1 )

I have found no way, to "produce" null values via udfs (of course, as Ints can not be null).

My current solution for creation of df2 (Table 2) is as follows:

// holds data of table 1  
val df1 = ... 

// filter entries from df1, that are not null
val dfNotNulls = df1.filter(df1("x")
  .isNotNull)
  .withColumn("y", extractDateAsInt(df1("x")))
  .withColumnRenamed("x", "right_x")

// create df2 via a left join on df1 and dfNotNull having 
val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")

Questions:

Code excerpt

val extractDateAsNullableInt = udf[NullableInt, String] (
  (d:String) => 
    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
    else null )

Solution

  • This is where Optioncomes in handy:

    val extractDateAsOptionInt = udf((d: String) => d match {
      case null => None
      case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)
    })
    

    or to make it slightly more secure in general case:

    import scala.util.Try
    
    val extractDateAsOptionInt = udf((d: String) => Try(
      d.substring(0, 10).filterNot("-".toSet).toInt
    ).toOption)
    

    All credit goes to Dmitriy Selivanov who've pointed out this solution as a (missing?) edit here.

    Alternative is to handle null outside the UDF:

    import org.apache.spark.sql.functions.{lit, when}
    import org.apache.spark.sql.types.IntegerType
    
    val extractDateAsInt = udf(
       (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt
    )
    
    df.withColumn("y",
      when($"x".isNull, lit(null))
        .otherwise(extractDateAsInt($"x"))
        .cast(IntegerType)
    )