Right now I'm facing a problem that I can't solve, let me explain.
I need to pivot a spark-dataframe, but in some cases there are no records for the pivot to include the column that I need. Here is an example:
Let's say that the are 4 types of events: A, B, C, D
I receive an event logs files like this:
|Id|year|type|
|--|----|----|
|a |2015| A|
|a |2016| A|
|a |2015| A|
|a |2017| A|
|b |2015| A|
|b |2015| B|
|b |2016| D|
|b |2015| B|
|b |2017| A|
When I do the pivot I get:
|id|year| A| B| D|
|--|----|----|----|----|
|a |2015| 2|null|null|
|a |2016| 1|null|null|
|a |2017| 1|null|null|
|b |2015| 1| 2|null|
|b |2016| 1|null| 1|
|b |2017| 1|null|null|
what I really need is:
|id|year| A| B| C| D|
|--|----|----|----|----|----|
|a |2015| 2| 0| 0| 0|
|a |2016| 1| 0| 0| 0|
|a |2017| 1| 0| 0| 0|
|b |2015| 1| 2| 0| 0|
|b |2016| 1| 0| 0| 1|
|b |2017| 1| 0| 0| 0|
And imagine that on each event log file that I receive there are different type events missing, but I always need to have the columns for all of the event types (A, B, C and D).
I hope that I explained myself well.
val lscol = Seq((“A”),(“B”),(“C”),(“D”)).toDF(“Type”)
//df is the initial dataframe prior to pivot
df.join(lscol,df.col(“Type”)===lscol.col(“Type”),”right”)
.drop(df(“Type”))
.groupBy(“ID”,”year”)
.pivot(“Type”)
.agg(count(“Type”))
.filter(!col(“ID”).isNull && !col(“year”).isNull)
.na
.fill(0)
.show