3

I have the following pyspark df:

+------------------+--------+-------+
|                ID|  Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944|  32850|
|201542399349300629| 3979760| 850914|
|201542399349300634| 3402687|1983568|
|201542399349300724| 1138291|1097553|
|201522369349300122| 1401406|1010828|
|201522369349300137|   16948| 171534|
|201522369349300142|13474056|2285323|
|201522369349300202|  481045| 241788|
|201522369349300207|  700861|1185640|
|201522369349300227|  178479| 267976|
+------------------+--------+-------+

For each row, I want to be able to get the rows that are within 20% of the Assets amount. For example, for the first row (ID=201542399349300619), I want to be able to get all the rows where Assets are within 20% +/- of 1,633,944 (so between 1,307,155 to 1,960,732):

+------------------+--------+-------+
|                ID|  Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944|  32850|
|201522369349300122| 1401406|1010828|

Using this subsetted table, I want to get the average assets and add it as a new column. So for the above example, it would be the average assets of (1633944+1401406) = 1517675

+------------------+--------+-------+---------+
|                ID|  Assets|Revenue|AvgAssets|
+------------------+--------+-------+---------+
|201542399349300619| 1633944|  32850|  1517675|
himi64
  • 1,069
  • 3
  • 12
  • 23
  • @pault My mistake, I have corrected the typos. I am not very familiar with SQL, hence I want to just get some guidance on how I can achieve this with what I have in place. – himi64 Mar 07 '19 at 18:37

1 Answers1

4

Assuming your DataFrame has a schema similar to the following (i.e. Assets and Revenue are numeric):

df.printSchema()
#root
# |-- ID: long (nullable = true)
# |-- Assets: integer (nullable = true)
# |-- Revenue: integer (nullable = true)

You can join the DataFrame to itself on the condition that you've set forth. After the join, you can group and aggregate by taking the average of the Assets column.

For example:

from pyspark.sql.functions import avg, expr

df.alias("l")\
    .join(
        df.alias("r"), 
        on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
    )\
    .groupBy("l.ID", "l.Assets", "l.Revenue")\
    .agg(avg("r.Assets").alias("AvgAssets"))\
    .show()
#+------------------+--------+-------+------------------+
#|                ID|  Assets|Revenue|         AvgAssets|
#+------------------+--------+-------+------------------+
#|201542399349300629| 3979760| 850914|         3691223.5|
#|201522369349300202|  481045| 241788|          481045.0|
#|201522369349300207|  700861|1185640|          700861.0|
#|201522369349300137|   16948| 171534|           16948.0|
#|201522369349300142|13474056|2285323|       1.3474056E7|
#|201522369349300227|  178479| 267976|          178479.0|
#|201542399349300619| 1633944|  32850|         1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553|         1138291.0|
#|201542399349300634| 3402687|1983568|         3691223.5|
#+------------------+--------+-------+------------------+

Since we are joining the DataFrame to itself, we can use aliases to refer to the left table ("l") and the right table ("r"). The logic above says join l to r on the condition that the assets in r is +/20% of the assets in l.

There are multiple ways to express the +/20% condition, but I am using the spark-sql between expression to find rows that are between Assets * 0.8 and Assets * 1.2.

Then we aggregate on all of the columns (groupBy) of the left table and average over the assets in the right table.

The resulting AvgAssets column is a FloatType column, but you can easily convert it to IntegerType by adding a .cast("int") before the .alias("AvgAssets") if that's what you prefer.


See also:

pault
  • 41,343
  • 15
  • 107
  • 149