scalaapache-sparkapache-spark-sql

Spark: Replace Null value in a Nested column


I would like to replace all the n/a values in the below dataframe to unknown. It can be either scalar or complex nested column. If it's a StructField column I can loop through the columns and replace n\a using WithColumn. But I would like this to be done in a generic way inspite of the type of the column as I dont want to specify the column names explicitly as there are 100's in my case?

case class Bar(x: Int, y: String, z: String)
case class Foo(id: Int, name: String, status: String, bar: Seq[Bar])

val df = spark.sparkContext.parallelize(
Seq(
  Foo(123, "Amy", "Active", Seq(Bar(1, "first", "n/a"))),
  Foo(234, "Rick", "n/a", Seq(Bar(2, "second", "fifth"),Bar(22, "second", "n/a"))),
  Foo(567, "Tom", "null", Seq(Bar(3, "second", "sixth")))
)).toDF

df.printSchema
df.show(20, false)

Result:

+---+----+------+---------------------------------------+
|id |name|status|bar                                    |
+---+----+------+---------------------------------------+
|123|Amy |Active|[[1, first, n/a]]                      |
|234|Rick|n/a   |[[2, second, fifth], [22, second, n/a]]|
|567|Tom |null  |[[3, second, sixth]]                   |
+---+----+------+---------------------------------------+   

Expected Output:

+---+----+----------+---------------------------------------------------+
|id |name|status    |bar                                                |
+---+----+----------+---------------------------------------------------+
|123|Amy |Active    |[[1, first, unknown]]                              |
|234|Rick|unknown   |[[2, second, fifth], [22, second, unknown]]        |
|567|Tom |null      |[[3, second, sixth]]                               |
+---+----+----------+---------------------------------------------------+

Any suggestion on this?


Solution

  • If you like playing with RDDs, here's a simple, generic and evolutive solution :

      val naToUnknown = {r: Row =>
        def rec(r: Any): Any = {
          r match {
            case row: Row => Row.fromSeq(row.toSeq.map(rec))
            case seq: Seq[Any] => seq.map(rec)
            case s: String if s == "n/a" => "unknown"
            case _ => r
          }
        }
        Row.fromSeq(r.toSeq.map(rec))
      }
    
      val newDF = spark.createDataFrame(df.rdd.map{naToUnknown}, df.schema)
      newDF.show(false)
    

    Output :

    +---+----+-------+-------------------------------------------+
    |id |name|status |bar                                        |
    +---+----+-------+-------------------------------------------+
    |123|Amy |Active |[[1, first, unknown]]                      |
    |234|Rick|unknown|[[2, second, fifth], [22, second, unknown]]|
    |567|Tom |null   |[[3, second, sixth]]                       |
    +---+----+-------+-------------------------------------------+