apache-spark-sqlpivotmissing-datadynamic-columns

spark-dataframe pivot missing columns/values


Right now I'm facing a problem that I can't solve, let me explain.

I need to pivot a spark-dataframe, but in some cases there are no records for the pivot to include the column that I need. Here is an example:

Let's say that the are 4 types of events: A, B, C, D

I receive an event logs files like this:

|Id|year|type|
|--|----|----|
|a |2015|   A|
|a |2016|   A|
|a |2015|   A|
|a |2017|   A|
|b |2015|   A|
|b |2015|   B|
|b |2016|   D|
|b |2015|   B|
|b |2017|   A|  

When I do the pivot I get:

|id|year|   A|   B|   D|
|--|----|----|----|----|
|a |2015|   2|null|null|
|a |2016|   1|null|null|
|a |2017|   1|null|null|
|b |2015|   1|   2|null|
|b |2016|   1|null|   1|
|b |2017|   1|null|null|

what I really need is:

|id|year|   A|   B|   C|   D|
|--|----|----|----|----|----|
|a |2015|   2|   0|   0|   0|
|a |2016|   1|   0|   0|   0|
|a |2017|   1|   0|   0|   0|
|b |2015|   1|   2|   0|   0|
|b |2016|   1|   0|   0|   1|
|b |2017|   1|   0|   0|   0|

And imagine that on each event log file that I receive there are different type events missing, but I always need to have the columns for all of the event types (A, B, C and D).

I hope that I explained myself well.


Solution

  • val lscol = Seq((“A”),(“B”),(“C”),(“D”)).toDF(“Type”)
    
    //df is the initial dataframe prior to pivot
    df.join(lscol,df.col(“Type”)===lscol.col(“Type”),”right”)
        .drop(df(“Type”))
        .groupBy(“ID”,”year”)
        .pivot(“Type”)
        .agg(count(“Type”))
        .filter(!col(“ID”).isNull && !col(“year”).isNull)
        .na
        .fill(0)
        .show