dataframescalaapache-sparkapache-spark-sqlspark-window-function

Find number of null records between two non-null records in scala dataframe


I have a dataframe as shown below.

|  ID | date       | sig01_diff |
+-----+------------+------------+
| 123 | 2019-11-04 | 93668      |
| 123 | 2019-11-05 | 49350      |
| 123 | 2019-11-07 | null       |
| 123 | 2019-11-08 | 11069      |
| 123 | 2019-11-09 | 33203      |
| 123 | 2019-11-11 | 47927      |
| 123 | 2020-01-21 | null       |
| 123 | 2020-01-22 | null       |
| 123 | 2020-01-23 | 33908      |
| 123 | 2020-01-24 | 61603      |
| 123 | 2020-01-27 | 33613      |
| 123 | 2020-01-28 | 27514      |
| 123 | 2020-01-29 | null       |
| 123 | 2020-01-30 | null       |
| 123 | 2020-02-11 | null       |
| 123 | 2020-02-12 | null       |
| 123 | 2020-02-13 | null       |
| 123 | 2020-02-14 | null       |
| 123 | 2020-02-15 | 65625      |
| 123 | 2020-02-17 | 13354      |
| 123 | 2020-02-18 | null       |
| 123 | 2020-02-19 | 69069      |
+-----+------------+------------+

I have to get number of null record preceding to a record as shown below.

|  ID | date       | sig01_diff |null_count |
+-----+------------+------------+-----------+
| 123 | 2019-11-04 | 93668      | 00        |
| 123 | 2019-11-05 | 49350      | 00        |
| 123 | 2019-11-07 | null       | 00        |
| 123 | 2019-11-08 | 11069      | 01        |
| 123 | 2019-11-09 | 33203      | 00        |
| 123 | 2019-11-11 | 47927      | 00        |
| 123 | 2020-01-21 | null       | 00        |
| 123 | 2020-01-22 | null       | 00        |
| 123 | 2020-01-23 | 33908      | 02        |
| 123 | 2020-01-24 | 61603      | 00        |
| 123 | 2020-01-27 | 33613      | 00        |
| 123 | 2020-01-28 | 27514      | 00        |
| 123 | 2020-01-29 | null       | 00        |
| 123 | 2020-01-30 | null       | 00        |
| 123 | 2020-02-11 | null       | 00        |
| 123 | 2020-02-12 | null       | 00        |
| 123 | 2020-02-13 | null       | 00        |
| 123 | 2020-02-14 | null       | 00        |
| 123 | 2020-02-15 | 65625      | 06        |
| 123 | 2020-02-17 | 13354      | 00        |
| 123 | 2020-02-18 | null       | 00        |
| 123 | 2020-02-19 | 69069      | 01        |
+-----+------------+------------+-----------+

As shown above the new column will have a count of null records preceding to that record. for example for below dates:

2019-11-08
2020-02-15

Using window function and unboundpreceding, I am able to find count of null records incrementally within a window. But my requirement is within a window the count of null records between two non-null records.

enter image description here

How could I achieve this ? Any leads appreciated!


Solution

  • It seems like there are no direct functions to find out number of null records between two non-null records.

    Below provided a working solution for this question.

    %scala
    import spark.implicits._
    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.expressions.Window
    
    val dfTest = Seq(
        (1,1,10,1),
        (1,2,10,2),
        (1,3,0,1),
        (1,4,20,1),
        (1,5,0,1),
        (1,6,0,1),
        (1,7,60,0),
        (1,8,0,2),
        (1,9,0,1),
        (1,10,0,1),
        (1,11,80,1),
        (1,7,60,1),
        (2,1,10,1),
        (2,2,10,2),
        (2,3,0,1),
        (2,4,20,0),
        (2,5,0,0),
        (2,6,0,1),
        (2,7,60,1)
      ).toDF("ID","date","A","B")
    
    val test = dfTest.withColumn("A", when((col("A") === 0),null).otherwise(col("A")))
      .withColumn("B", when((col("B") === 0),null).otherwise(col("B")))
    
    

    The Input dataframe is as shown below.

    +---+----+----+----+
    | ID|date|   A|   B|
    +---+----+----+----+
    |  1|   1|  10|   1|
    |  1|   2|  10|   2|
    |  1|   3|null|   1|
    |  1|   4|  20|   1|
    |  1|   5|null|   1|
    |  1|   6|null|   1|
    |  1|   7|  60|null|
    |  1|   8|null|   2|
    |  1|   9|null|   1|
    |  1|  10|null|   1|
    |  1|  11|  80|   1|
    |  1|   7|  60|   1|
    |  2|   1|  10|   1|
    |  2|   2|  10|   2|
    |  2|   3|null|   1|
    |  2|   4|  20|null|
    |  2|   5|null|null|
    |  2|   6|null|   1|
    |  2|   7|  60|   1|
    +---+----+----+----+
    
    

    The solution is provided as below.

    val w2 = Window.partitionBy("ID").orderBy("date")
    val w3 = Window.partitionBy("ID").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow-1)
    
    
    val newDf = test
      .withColumn("A_cnt", count(when(col("A").isNull, 1)).over(w3))
      .withColumn("B_cnt", count(when(col("B").isNull, 1)).over(w3))
      .withColumn("A_null", when(col("A").isNotNull && lag(col("A"),1).over(w2).isNull, col("A_cnt")).otherwise(null))
      .withColumn("B_null", when(col("B").isNotNull && lag(col("B"),1).over(w2).isNull, col("B_cnt")).otherwise(null))
      .withColumn("A_null_cnt", when(col("A").isNotNull && lag(col("A"),1).over(w2).isNull, col("A_null") - last("A_null", true).over(w3)).otherwise(null))
      .withColumn("B_null_cnt", when(col("B").isNotNull && lag(col("B"),1).over(w2).isNull, col("B_null") - last("B_null", true).over(w3)).otherwise(null))
      .drop("A_cnt")
      .drop("B_cnt")
      .drop("A_null")
      .drop("B_null")
    
    

    And the output will be as shown below.

    +---+----+----+----+------------+------------+
    | ID|date|   A|   B|  A_null_cnt|  B_null_cnt|
    +---+----+----+----+------------+------------+
    |  1|   1|  10|   1|        null|        null|
    |  1|   2|  10|   2|        null|        null|
    |  1|   3|null|   1|        null|        null|
    |  1|   4|  20|   1|           1|        null|
    |  1|   5|null|   1|        null|        null|
    |  1|   6|null|   1|        null|        null|
    |  1|   7|  60|null|           2|        null|
    |  1|   7|  60|   1|        null|           1|
    |  1|   8|null|   2|        null|        null|
    |  1|   9|null|   1|        null|        null|
    |  1|  10|null|   1|        null|        null|
    |  1|  11|  80|   1|           3|        null|
    |  2|   1|  10|   1|        null|        null|
    |  2|   2|  10|   2|        null|        null|
    |  2|   3|null|   1|        null|        null|
    |  2|   4|  20|null|           1|        null|
    |  2|   5|null|null|        null|        null|
    |  2|   6|null|   1|        null|           2|
    |  2|   7|  60|   1|           2|        null|
    +---+----+----+----+------------+------------+