I have a dataframe as shown below.
| ID | date | sig01_diff |
+-----+------------+------------+
| 123 | 2019-11-04 | 93668 |
| 123 | 2019-11-05 | 49350 |
| 123 | 2019-11-07 | null |
| 123 | 2019-11-08 | 11069 |
| 123 | 2019-11-09 | 33203 |
| 123 | 2019-11-11 | 47927 |
| 123 | 2020-01-21 | null |
| 123 | 2020-01-22 | null |
| 123 | 2020-01-23 | 33908 |
| 123 | 2020-01-24 | 61603 |
| 123 | 2020-01-27 | 33613 |
| 123 | 2020-01-28 | 27514 |
| 123 | 2020-01-29 | null |
| 123 | 2020-01-30 | null |
| 123 | 2020-02-11 | null |
| 123 | 2020-02-12 | null |
| 123 | 2020-02-13 | null |
| 123 | 2020-02-14 | null |
| 123 | 2020-02-15 | 65625 |
| 123 | 2020-02-17 | 13354 |
| 123 | 2020-02-18 | null |
| 123 | 2020-02-19 | 69069 |
+-----+------------+------------+
I have to get number of null record preceding to a record as shown below.
| ID | date | sig01_diff |null_count |
+-----+------------+------------+-----------+
| 123 | 2019-11-04 | 93668 | 00 |
| 123 | 2019-11-05 | 49350 | 00 |
| 123 | 2019-11-07 | null | 00 |
| 123 | 2019-11-08 | 11069 | 01 |
| 123 | 2019-11-09 | 33203 | 00 |
| 123 | 2019-11-11 | 47927 | 00 |
| 123 | 2020-01-21 | null | 00 |
| 123 | 2020-01-22 | null | 00 |
| 123 | 2020-01-23 | 33908 | 02 |
| 123 | 2020-01-24 | 61603 | 00 |
| 123 | 2020-01-27 | 33613 | 00 |
| 123 | 2020-01-28 | 27514 | 00 |
| 123 | 2020-01-29 | null | 00 |
| 123 | 2020-01-30 | null | 00 |
| 123 | 2020-02-11 | null | 00 |
| 123 | 2020-02-12 | null | 00 |
| 123 | 2020-02-13 | null | 00 |
| 123 | 2020-02-14 | null | 00 |
| 123 | 2020-02-15 | 65625 | 06 |
| 123 | 2020-02-17 | 13354 | 00 |
| 123 | 2020-02-18 | null | 00 |
| 123 | 2020-02-19 | 69069 | 01 |
+-----+------------+------------+-----------+
As shown above the new column will have a count of null records preceding to that record. for example for below dates:
2019-11-08
2020-02-15
Using window function and unboundpreceding, I am able to find count of null records incrementally within a window. But my requirement is within a window the count of null records between two non-null records.
How could I achieve this ? Any leads appreciated!
It seems like there are no direct functions to find out number of null records between two non-null records.
Below provided a working solution for this question.
%scala
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window
val dfTest = Seq(
(1,1,10,1),
(1,2,10,2),
(1,3,0,1),
(1,4,20,1),
(1,5,0,1),
(1,6,0,1),
(1,7,60,0),
(1,8,0,2),
(1,9,0,1),
(1,10,0,1),
(1,11,80,1),
(1,7,60,1),
(2,1,10,1),
(2,2,10,2),
(2,3,0,1),
(2,4,20,0),
(2,5,0,0),
(2,6,0,1),
(2,7,60,1)
).toDF("ID","date","A","B")
val test = dfTest.withColumn("A", when((col("A") === 0),null).otherwise(col("A")))
.withColumn("B", when((col("B") === 0),null).otherwise(col("B")))
The Input dataframe is as shown below.
+---+----+----+----+
| ID|date| A| B|
+---+----+----+----+
| 1| 1| 10| 1|
| 1| 2| 10| 2|
| 1| 3|null| 1|
| 1| 4| 20| 1|
| 1| 5|null| 1|
| 1| 6|null| 1|
| 1| 7| 60|null|
| 1| 8|null| 2|
| 1| 9|null| 1|
| 1| 10|null| 1|
| 1| 11| 80| 1|
| 1| 7| 60| 1|
| 2| 1| 10| 1|
| 2| 2| 10| 2|
| 2| 3|null| 1|
| 2| 4| 20|null|
| 2| 5|null|null|
| 2| 6|null| 1|
| 2| 7| 60| 1|
+---+----+----+----+
The solution is provided as below.
val w2 = Window.partitionBy("ID").orderBy("date")
val w3 = Window.partitionBy("ID").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow-1)
val newDf = test
.withColumn("A_cnt", count(when(col("A").isNull, 1)).over(w3))
.withColumn("B_cnt", count(when(col("B").isNull, 1)).over(w3))
.withColumn("A_null", when(col("A").isNotNull && lag(col("A"),1).over(w2).isNull, col("A_cnt")).otherwise(null))
.withColumn("B_null", when(col("B").isNotNull && lag(col("B"),1).over(w2).isNull, col("B_cnt")).otherwise(null))
.withColumn("A_null_cnt", when(col("A").isNotNull && lag(col("A"),1).over(w2).isNull, col("A_null") - last("A_null", true).over(w3)).otherwise(null))
.withColumn("B_null_cnt", when(col("B").isNotNull && lag(col("B"),1).over(w2).isNull, col("B_null") - last("B_null", true).over(w3)).otherwise(null))
.drop("A_cnt")
.drop("B_cnt")
.drop("A_null")
.drop("B_null")
And the output will be as shown below.
+---+----+----+----+------------+------------+
| ID|date| A| B| A_null_cnt| B_null_cnt|
+---+----+----+----+------------+------------+
| 1| 1| 10| 1| null| null|
| 1| 2| 10| 2| null| null|
| 1| 3|null| 1| null| null|
| 1| 4| 20| 1| 1| null|
| 1| 5|null| 1| null| null|
| 1| 6|null| 1| null| null|
| 1| 7| 60|null| 2| null|
| 1| 7| 60| 1| null| 1|
| 1| 8|null| 2| null| null|
| 1| 9|null| 1| null| null|
| 1| 10|null| 1| null| null|
| 1| 11| 80| 1| 3| null|
| 2| 1| 10| 1| null| null|
| 2| 2| 10| 2| null| null|
| 2| 3|null| 1| null| null|
| 2| 4| 20|null| 1| null|
| 2| 5|null|null| null| null|
| 2| 6|null| 1| null| 2|
| 2| 7| 60| 1| 2| null|
+---+----+----+----+------------+------------+