1

Given the following graph:

example graph

Where A has a value of 20, B has a value of 5 and C has a value of 10, I would like to use pyspark/graphframes to compute the power mean. That is,

power mean

In this case n is the number of items (3 in our case, for three vertices at A - including A), our p is taken to be n * 2 and the normalization factor is 1/n, or 1/3. So the resulting value for A should be:

n = 3
norm_factor = 1/n
p = n * 2
result = (norm_factor * (20^p + 5^p + 10^p))^(1/p) = 16.697421658890875

So the question is, how do I compute this with pyspark/graphframes? I have the following graph:

spark = SparkSession.builder.appName('get-the-power').getOrCreate()
vertices = spark.createDataFrame([('1', 'A', 20), 
                                  ('2', 'B', 5),
                                  ('3', 'C', 10)],
                                  ['id', 'name', 'value'])

edges = spark.createDataFrame([('1', '2'), 
                               ('1', '3')],
                              ['src', 'dst'])

g = GraphFrame(vertices, edges)

I assume I'll need to aggregate the values from the children, and have been playing with message aggregation.

agg = g.aggregateMessages(
    sqlsum(AM.msg).alias("totalValue"),
    sendToSrc=AM.dst['value'],
    sendToDst=AM.dst['value'])

agg.show()

This results in

+---+----------+
| id|totalValue|
+---+----------+
|  3|        10|
|  1|        15|
|  2|         5|
+---+----------+

How do I replace totalValue (the sqlsum) with the power mean? Surely there is a way to do this using Spark functions from pyspark?

--- UPDATE ---

It seems like I can sort of approximate this with a UDF.

def power_mean(values):
    n = len(values)
    norm_factor = 1/n
    p = n * 2
    return (norm_factor * sum([(x)**p for x in values]))**(1/p)

udf_power_mean = func.udf(power_mean, returnType=DoubleType())

# Aggregate the values from child vertices, as I was doing before.
agg = g.aggregateMessages(
    collect_list(AM.msg).alias("totalValue"),
    sendToSrc=AM.dst['value'],
    sendToDst=None)

# `concat` the value for this vertex with its children values. 
# We end up with an `array<int>` that we then pass to `udf_power_mean`.
new_vertices = agg.join(vertices, vertices.id == agg.id, "left")\
                .select(vertices.id, \
                        'name', \
                        'value', \
                        concat(array(col('value')), 'totalValue').alias("allValues"))\
                .withColumn('totalScore', udf_power_mean(col('allValues')))\
                .drop('allValues')

new_vertices.show()

This produces:

+---+----+-----+------------------+
| id|name|value|        totalScore|
+---+----+-----+------------------+
|  1| foo|   20|16.697421658890875|
+---+----+-----+------------------+

Is there anyway to do this without the UDF? Just plain spark functions?

Julio
  • 2,261
  • 4
  • 30
  • 56
  • 1
    if you have spark 2.4+, try [aggregate](https://spark.apache.org/docs/latest/api/sql/index.html#aggregate) function: `power_mean = lambda col: func.expr(f"aggregate({col}, 0D, (acc,x) -> acc+power(x,2*size({col})), acc -> power(acc/size({col}), 0.5/size({col})))")` – jxc Dec 31 '20 at 05:25
  • @jxc awesome, that seems to work and is much quicker than my UDF. Why is this? Want to make this an answer and I'll accept it? One more follow up, with my `concat` if `totalValue` is `null` the entire concat is `null`. Anyway to make it always be an array with `col('value')` and if `totalValue` exists also include it? – Julio Dec 31 '20 at 06:39
  • did you mean if any of the array elements of array `value` is null, then the aggregated totalScore is also null? if, so how do you want to calculate the `n`, include or exclude the nulls? – jxc Dec 31 '20 at 15:29
  • _udf_ vs _builtin function_ is more like a faq, see [here](https://stackoverflow.com/a/38297050/9510729), the rule of thumb is to avoid udf whenever possible. try a solution from `builtin` -> `scala/java-based udf` -> `pandas_udf` -> `python-based udf` and in that order. – jxc Dec 31 '20 at 16:39

1 Answers1

1

For Spark 2.4+, you can use aggregate function:

A simple version:

power_mean = lambda col: func.expr(f"""
    aggregate(`{col}`, 0D, (acc,x) -> acc+power(x,2*size(`{col}`)), acc -> power(acc/size(`{col}`), 0.5/size(`{col}`)))
""")

One issue for the above solution is that if any of the array element is NULL, the resulting totalScore will be NULL, to avoid this, you can do the following:

power_mean = lambda col: func.expr(f"""
    aggregate(
      /* expr: array column to iterate through */
      `{col}`,
      /* start: set zero value and the accumulator as an struct<psum:double,n:int> */
      (0D as psum, size(filter(`{col}`, x -> x is not null)) as n),
      /* merge: calculate `sum([(x)**p for x in values])` */
      (acc,x) -> (acc.psum+power(coalesce(x,0),2*acc.n) as psum, acc.n as n),
      /* finish: post processing */
      acc -> power(acc.psum/acc.n, 0.5/acc.n)
    ) 
""")

depends on how you want to set the n, the above will skip the null values from the n, if you want to count them, just change the 2nd argument from:

(0D as psum, size(filter(`{col}`, x -> x is not null)) as n),

to

(0D as psum, size(`{col}`) as n), 

Example:

df = spark.createDataFrame([([20,5,None,10],)],['value'])
df.select("value", power_mean("value").alias('totalScore')).show(truncate=False)
+------------+------------------+
|value       |totalScore        |
+------------+------------------+
|[20, 5,, 10]|16.697421658984894|
+------------+------------------+

BTW. if you want to concat() totalScore even with NULL values with other columns, just use coalesce() function, or concat_ws() if possible.

jxc
  • 13,553
  • 4
  • 16
  • 34
  • This works great! One question, why is the `acc -> power(acc/size(`{col}`), 0.5/size(`{col}`)` using `0.5/size` instead of `1.0/size`? The `0.5` gives me an answer closer to what I expect, but the `1.0` seems to match the power mean. – Julio Dec 31 '20 at 18:19
  • hi, @Julio, it's from your formula: `p = n * 2` --> `1/p = 0.5 /n` – jxc Dec 31 '20 at 18:22