I want to implement a function similar to Oracle's LISTAGG function.
Equivalent oracle code is
select KEY,
listagg(CODE, '-') within group (order by DATE) as CODE
from demo_table
group by KEY
Here is my spark scala dataframe implementation, but unable to order the values with in each group.
Input:
val values = List(List("66", "PL", "2016-11-01"), List("66", "PL", "2016-12-01"),
List("67", "JL", "2016-12-01"), List("67", "JL", "2016-11-01"), List("67", "PL", "2016-10-01"), List("67", "PO", "2016-09-01"), List("67", "JL", "2016-08-01"),
List("68", "PL", "2016-12-01"), List("68", "JO", "2016-11-01"))
.map(row => (row(0), row(1), row(2)))
val df = values.toDF("KEY", "CODE", "DATE")
df.show()
+---+----+----------+
|KEY|CODE| DATE|
+---+----+----------+
| 66| PL|2016-11-01|
| 66| PL|2016-12-01|----- group 1
| 67| JL|2016-12-01|
| 67| JL|2016-11-01|
| 67| PL|2016-10-01|
| 67| PO|2016-09-01|
| 67| JL|2016-08-01|----- group 2
| 68| PL|2016-12-01|
| 68| JO|2016-11-01|----- group 3
+---+----+----------+
udf implementation :
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
val listAgg = udf((xs: Seq[String]) => xs.mkString("-"))
df.groupBy("KEY")
.agg(listAgg(collect_list("CODE")).alias("CODE"))
.show(false)
+---+--------------+
|KEY|CODE |
+---+--------------+
|68 |PL-JO |
|67 |JL-JL-PL-PO-JL|
|66 |PL-PL |
+---+--------------+
Expected Output : - order by date
+---+--------------+
|KEY|CODE |
+---+--------------+
|68 |JO-PL |
|67 |JL-PO-PL-JL-JL|
|66 |PL-PL |
+---+--------------+
Use struct
inbuilt function to combine the CODE
and DATE
columns and use that new struct column in collect_list
aggregation function. And in the udf
function sort by the DATE and collect the CODE as -
separated string
import org.apache.spark.sql.functions._
def sortAndStringUdf = udf((codeDate: Seq[Row])=> codeDate.sortBy(row => row.getAs[Long]("DATE")).map(row => row.getAs[String]("CODE")).mkString("-"))
df.withColumn("codeDate", struct(col("CODE"), col("DATE").cast("timestamp").cast("long").as("DATE")))
.groupBy("KEY").agg(sortAndStringUdf(collect_list("codeDate")).as("CODE"))
which should give you
+---+--------------+
|KEY| CODE|
+---+--------------+
| 68| JO-PL|
| 67|JL-PO-PL-JL-JL|
| 66| PL-PL|
+---+--------------+
I hope the answer is helpful
I am sure this will be faster than using udf
function
df.withColumn("codeDate", struct(col("DATE").cast("timestamp").cast("long").as("DATE"), col("CODE")))
.groupBy("KEY")
.agg(concat_ws("-", expr("sort_array(collect_list(codeDate)).CODE")).alias("CODE"))
.show(false)
which should give you the same result as above