pythonapache-sparkpysparkgraphframes

Pyspark and Graphframes: Aggregate messages power mean


Given the following graph:

example graph

Where A has a value of 20, B has a value of 5 and C has a value of 10, I would like to use pyspark/graphframes to compute the power mean. That is,

power mean

In this case n is the number of items (3 in our case, for three vertices at A - including A), our p is taken to be n * 2 and the normalization factor is 1/n, or 1/3. So the resulting value for A should be:

n = 3
norm_factor = 1/n
p = n * 2
result = (norm_factor * (20^p + 5^p + 10^p))^(1/p) = 16.697421658890875

So the question is, how do I compute this with pyspark/graphframes? I have the following graph:

spark = SparkSession.builder.appName('get-the-power').getOrCreate()
vertices = spark.createDataFrame([('1', 'A', 20), 
                                  ('2', 'B', 5),
                                  ('3', 'C', 10)],
                                  ['id', 'name', 'value'])

edges = spark.createDataFrame([('1', '2'), 
                               ('1', '3')],
                              ['src', 'dst'])

g = GraphFrame(vertices, edges)

I assume I'll need to aggregate the values from the children, and have been playing with message aggregation.

agg = g.aggregateMessages(
    sqlsum(AM.msg).alias("totalValue"),
    sendToSrc=AM.dst['value'],
    sendToDst=AM.dst['value'])

agg.show()

This results in

+---+----------+
| id|totalValue|
+---+----------+
|  3|        10|
|  1|        15|
|  2|         5|
+---+----------+

How do I replace totalValue (the sqlsum) with the power mean? Surely there is a way to do this using Spark functions from pyspark?

--- UPDATE ---

It seems like I can sort of approximate this with a UDF.

def power_mean(values):
    n = len(values)
    norm_factor = 1/n
    p = n * 2
    return (norm_factor * sum([(x)**p for x in values]))**(1/p)

udf_power_mean = func.udf(power_mean, returnType=DoubleType())

# Aggregate the values from child vertices, as I was doing before.
agg = g.aggregateMessages(
    collect_list(AM.msg).alias("totalValue"),
    sendToSrc=AM.dst['value'],
    sendToDst=None)

# `concat` the value for this vertex with its children values. 
# We end up with an `array<int>` that we then pass to `udf_power_mean`.
new_vertices = agg.join(vertices, vertices.id == agg.id, "left")\
                .select(vertices.id, \
                        'name', \
                        'value', \
                        concat(array(col('value')), 'totalValue').alias("allValues"))\
                .withColumn('totalScore', udf_power_mean(col('allValues')))\
                .drop('allValues')

new_vertices.show()

This produces:

+---+----+-----+------------------+
| id|name|value|        totalScore|
+---+----+-----+------------------+
|  1| foo|   20|16.697421658890875|
+---+----+-----+------------------+

Is there anyway to do this without the UDF? Just plain spark functions?


Solution

  • For Spark 2.4+, you can use aggregate function:

    A simple version:

    power_mean = lambda col: func.expr(f"""
        aggregate(`{col}`, 0D, (acc,x) -> acc+power(x,2*size(`{col}`)), acc -> power(acc/size(`{col}`), 0.5/size(`{col}`)))
    """)
    

    One issue for the above solution is that if any of the array element is NULL, the resulting totalScore will be NULL, to avoid this, you can do the following:

    power_mean = lambda col: func.expr(f"""
        aggregate(
          /* expr: array column to iterate through */
          `{col}`,
          /* start: set zero value and the accumulator as an struct<psum:double,n:int> */
          (0D as psum, size(filter(`{col}`, x -> x is not null)) as n),
          /* merge: calculate `sum([(x)**p for x in values])` */
          (acc,x) -> (acc.psum+power(coalesce(x,0),2*acc.n) as psum, acc.n as n),
          /* finish: post processing */
          acc -> power(acc.psum/acc.n, 0.5/acc.n)
        ) 
    """)
    

    depends on how you want to set the n, the above will skip the null values from the n, if you want to count them, just change the 2nd argument from:

    (0D as psum, size(filter(`{col}`, x -> x is not null)) as n),
    

    to

    (0D as psum, size(`{col}`) as n), 
    

    Example:

    df = spark.createDataFrame([([20,5,None,10],)],['value'])
    df.select("value", power_mean("value").alias('totalScore')).show(truncate=False)
    +------------+------------------+
    |value       |totalScore        |
    +------------+------------------+
    |[20, 5,, 10]|16.697421658984894|
    +------------+------------------+
    

    BTW. if you want to concat() totalScore even with NULL values with other columns, just use coalesce() function, or concat_ws() if possible.