scalaapache-sparkapache-spark-sqlconcatenationaggregate

Spark Dataframe implementation similar to Oracle's LISTAGG function - Unable to Order with in the group


I want to implement a function similar to Oracle's LISTAGG function.

Equivalent oracle code is

select KEY,
listagg(CODE, '-') within group (order by DATE) as CODE
from demo_table
group by KEY

Here is my spark scala dataframe implementation, but unable to order the values with in each group.

Input:

val values = List(List("66", "PL", "2016-11-01"), List("66", "PL", "2016-12-01"),
  List("67", "JL", "2016-12-01"), List("67", "JL", "2016-11-01"), List("67", "PL", "2016-10-01"), List("67", "PO", "2016-09-01"), List("67", "JL", "2016-08-01"),
  List("68", "PL", "2016-12-01"), List("68", "JO", "2016-11-01"))
  .map(row => (row(0), row(1), row(2)))

val df = values.toDF("KEY", "CODE", "DATE")

df.show()

+---+----+----------+
|KEY|CODE|      DATE|
+---+----+----------+
| 66|  PL|2016-11-01|
| 66|  PL|2016-12-01|----- group 1
| 67|  JL|2016-12-01|
| 67|  JL|2016-11-01|
| 67|  PL|2016-10-01|
| 67|  PO|2016-09-01|
| 67|  JL|2016-08-01|----- group 2
| 68|  PL|2016-12-01|
| 68|  JO|2016-11-01|----- group 3
+---+----+----------+

udf implementation :

import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf

val listAgg = udf((xs: Seq[String]) => xs.mkString("-"))

df.groupBy("KEY")
  .agg(listAgg(collect_list("CODE")).alias("CODE"))
  .show(false)

+---+--------------+
|KEY|CODE          |
+---+--------------+
|68 |PL-JO         |
|67 |JL-JL-PL-PO-JL|
|66 |PL-PL         |
+---+--------------+

Expected Output : - order by date

+---+--------------+
|KEY|CODE          |
+---+--------------+
|68 |JO-PL         |
|67 |JL-PO-PL-JL-JL|
|66 |PL-PL         |
+---+--------------+

Solution

  • Use struct inbuilt function to combine the CODE and DATE columns and use that new struct column in collect_list aggregation function. And in the udf function sort by the DATE and collect the CODE as - separated string

    import org.apache.spark.sql.functions._
    def sortAndStringUdf = udf((codeDate: Seq[Row])=> codeDate.sortBy(row => row.getAs[Long]("DATE")).map(row => row.getAs[String]("CODE")).mkString("-"))
    
    df.withColumn("codeDate", struct(col("CODE"), col("DATE").cast("timestamp").cast("long").as("DATE")))
          .groupBy("KEY").agg(sortAndStringUdf(collect_list("codeDate")).as("CODE"))
    

    which should give you

    +---+--------------+
    |KEY|          CODE|
    +---+--------------+
    | 68|         JO-PL|
    | 67|JL-PO-PL-JL-JL|
    | 66|         PL-PL|
    +---+--------------+
    

    I hope the answer is helpful

    Update

    I am sure this will be faster than using udf function

    df.withColumn("codeDate", struct(col("DATE").cast("timestamp").cast("long").as("DATE"), col("CODE")))
      .groupBy("KEY")
      .agg(concat_ws("-", expr("sort_array(collect_list(codeDate)).CODE")).alias("CODE"))
      .show(false)
    

    which should give you the same result as above