javaapache-sparkapache-spark-sqlaggregate-functionsdynamic-columns

Spark java: agg on multiple columns and rename them


I want to do a group by on my dataset on multiple columns that I don't know them from before hand, so the .agg() allow to pass a Map where the key is column name and the value is the aggreation name, for example I can do this:

    for(String column:columns)
        map.put(column, "sum");
    ds.groupBy("someColumn").agg(map)

It's good till here, but I want to keep the original columns name and not having something like this

'|sum(column1)|sum(column12)|...'

I have tried to do this but it didn't work:

map.put(column, "sum alias " + column);

Is it possible to do this with java api?


Solution

  • Try this-

    I've provided the column name as alias to sum(column)

        Dataset<Row> df = spark.range(2).withColumn("value", lit(2));
            df.show(false);
            df.printSchema();
    
            /**
             * +---+-----+
             * |id |value|
             * +---+-----+
             * |0  |2    |
             * |1  |2    |
             * +---+-----+
             *
             * root
             *  |-- id: long (nullable = false)
             *  |-- value: integer (nullable = false)
             */
            Map<String, String> map = new HashMap<>();
            for(String column:df.columns())
                map.put(column, "sum");
    
            List<Column> cols = map.entrySet().stream().map(c -> expr(String.format("%s(%s) as %s", c.getValue(), c.getKey(), c.getKey())))
                    .collect(Collectors.toList());
    
    
            df.agg(cols.get(0), toScalaSeq(cols.subList(1, cols.size()))).show(false);
            /**
             * +---+-----+
             * |id |value|
             * +---+-----+
             * |1  |4    |
             * +---+-----+
             */
    

    Utility-

     <T> Buffer<T> toScalaSeq(List<T> list) {
            return JavaConversions.asScalaBuffer(list);
        }