pythonapache-sparkpysparkapache-spark-sqlcoalesce

PySpark aggregation function for "any value"


I have a PySpark Dataframe with an A field, few B fields that dependent on A (A->B) and C fields that I want to aggregate per each A. For example:

A | B | C
----------
A | 1 | 6
A | 1 | 7
B | 2 | 8
B | 2 | 4

I wish to group by A , present any of B and run aggregation (let's say SUM) on C.

The expected result would be:

A | B | C
----------
A | 1 | 13
B | 2 | 12

SQL-wise I would do:

SELECT A, COALESCE(B) as B, SUM(C) as C
FROM T
GROUP BY A

What is the PySpark way to do that?

I can group by A and B together or select MIN(B) per each A, for example:

df.groupBy('A').agg(F.min('B').alias('B'),F.sum('C').alias('C'))

or

df.groupBy(['A','B']).agg(F.sum('C').alias('C'))

but that seems inefficient. Is there is anything similar to SQL coalesce in PySpark?

Thanks


Solution

  • You'll just need to use first instead :

    from pyspark.sql.functions import first, sum, col
    from pyspark.sql import Row
    
    array = [Row(A="A", B=1, C=6),
             Row(A="A", B=1, C=7),
             Row(A="B", B=2, C=8),
             Row(A="B", B=2, C=4)]
    df = sqlContext.createDataFrame(sc.parallelize(array))
    
    results = df.groupBy(col("A")).agg(first(col("B")).alias("B"), sum(col("C")).alias("C"))
    

    Let's now check the results :

    results.show()
    # +---+---+---+
    # |  A|  B|  C|
    # +---+---+---+
    # |  B|  2| 12|
    # |  A|  1| 13|
    # +---+---+---+
    

    From the comments:

    Is first here is computationally equivalent to any ?

    groupBy causes shuffle. Thus a non deterministic behaviour is to expect.

    Which is confirmed in the documentation of first :

    Aggregate function: returns the first value in a group. The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned. note:: The function is non-deterministic because its results depends on order of rows which may be non-deterministic after a shuffle.

    So yes, computationally there are the same, and that's one of the reasons you need to use sorting if you need a deterministic behaviour.

    I hope this helps !