apache-sparkpysparkgroup-byconcatenationaggregate-functions

Concatenate string on grouping with the other column


I have dataframe as follows

+----------+--------------------+
|CustomerNo| desc               |
+----------+--------------------+
|  351856.0| FORM & BEAUTY 075 P|
|  351856.0| FORM & BEAUTY 075 P|
|  326022.0|            D 151 HP|
|   69430.0|Shape Sensation 0...|
|   38018.0|   Maximizer 846 WHU|
|   69712.0|Shape Sensation 0...|
|   71228.0|   Aqua Festive WHUD|
|   71228.0|Maximizer 736 WHU...|
|   73200.0|  T-Shirt Bra 081 HP|
|   73200.0|  T-Shirt Bra 081 HP|
|   73200.0|  T-Shirt Bra 081 HP|
|   74540.0|Form & Beauty 052 HP|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
+----------+--------------------+

I need to group by the data on CustomerNo and concatenate the string column.

I am using the following code, but it is giving error (AnalysisException):

df = retail_df.groupBy('CustomerNo').agg(F.concat('desc').alias('concat_Desc'))

AnalysisException: [MISSING_AGGREGATION] The non-aggregating expression "desc" is based on columns which are not participating in the GROUP BY clause. How to do this?


Solution

  • You can groupby the dataframe on CustomerNo and then do a collect list. Following which, you can concat the items of the list of a single column using concat_ws

    See the code below,

    retail_df \
        .groupBy('CustomerNo') \
        .agg(F.collect_list('desc').alias('items')) \
        .withColumn("concat_Desc", F.concat_ws(",", "items"))
    

    This solution is not using an udf hence will be better in terms of performance.