Input
+------+------+------+------+
|emp_name|emp_area| dept|zip|
+------+------+------+------+
|ram|USA|"Sales"|805912|
|sham|USA|"Sales"|805912|
|ram|Canada|"Marketing"|805912|
|ram|USA|"Sales"|805912|
|sham|USA|"Marketing"|805912|
+------+------+------+------
Desired output
feature |Top1 name |Top 1 value1|Top2 name|top 2 value|
emp_name ram |3|sham |2
emp_area Usa |4|canada |1
dept sales|3|Marketing|3
zip 805912|5|NA|NA
I started with dynamically generating the count for each one of them but unable to store them in a dataset
val features=ds.columns.toList
for (e <- features) {
val ds1=ds.groupBy(e).count().sort(desc("count")).limit(5).withColumnRenamed("count", e+"_count")
}
Now how to collect all the values into one dataframe and transform to the output?
Here's a slightly verbose approach. You can map
each column to a dataframe with one row, which corresponds to the row in the desired output. Add NA columns if necessary. Convert the column names to the desired ones, and finally do a unionAll
to combine the dataframes (one row each).
import org.apache.spark.sql.expressions.Window
val top = 2
val result = ds.columns.map(
c => ds.groupBy(c).count()
.withColumn("rn", row_number().over(Window.orderBy(desc("count"))))
.filter(s"rn <= $top")
.groupBy().pivot("rn")
.agg(first(col(c)), first(col("count")))
.select(lit(c), col("*"))
).map(df =>
if (df.columns.size != 1 + top*2)
df.select(List(col("*")) ::: (1 to (top*2+1 - df.columns.size)).toList.map(x => lit("NA")): _*)
else df
).map(df =>
df.toDF(List("feature") ::: (1 to top).toList.flatMap(x => Seq(s"top$x name", s"top$x value")): _*)
).reduce(_ unionAll _)
result.show
+--------+---------+----------+---------+----------+
| feature|top1 name|top1 value|top2 name|top2 value|
+--------+---------+----------+---------+----------+
|emp_name| ram| 3| sham| 2|
|emp_area| USA| 4| Canada| 1|
| dept| Sales| 3|Marketing| 2|
| zip| 805912| 5| NA| NA|
+--------+---------+----------+---------+----------+