scalaapache-sparkudf

Spark UDF Null handling


I'm struggeling handling null values in a UDF which operates on dataframe (which originates from a hive table) consisting of a struct of floats:

The dataframe (points) has the following schema:

root
 |-- point: struct (nullable = true)
 |    |-- x: float (nullable = true)
 |    |-- y: float (nullable = true)

For example, I want to calculate the sum of x and y. Note that I do not "handle" null values in the following examples, but I want to be able to check in my udf whether point,x or y are null.

First approach:

val sum = udf((x:Float,y:Float) => x+y)

points.withColumn("sum",sum($"point.x",$"point.y"))

This does not work if the struct point is null, in this case the udf is never evaluated (the code in the udf is never executed!), the result is null. Also, I cannot check x or y for being null as Floats cannot be null in scala.

Second approach:

val sum = udf((pt:Row) => pt.getFloat(0)+pt.getFloat(1))
points.withColumn("sum",sum($"point"))

Which this approach, I can check pt for null in my udf, but I'm nut able to check x and y because Floats cannot be null. I get a NullPointerException in this case.

How can I write an udf win which I can check the struct and x and y for being null?

I'm using spark 1.6.1

Update: In contrast to this question, I'm dealing with floats and not with strings (strings can be null in scala, floats not)


Solution

  • You can use Row.isNullAt(i) to check if ith field is null. In your case, you should write your udf as,

    sum = udf((point: Row) => point.match {
      case p if (p.isNullAt(0) && p.isNullAt(0)) => 0f
      case p if p.isNullAt(0) => p.getFloat(1)
      case p if p.isNullAt(1) => p.getFloat(0)
      case p => p.getFloat(0) + p.getFloat(1)
    })