1

Based on this answer I need to do some row wise calculation

result= (reduce(add, (<some row wise calculation on col(x)> for x in df.columns[1:])) / n).alias("result")

but before that I need to sort the row values by descending order(change column order in dataframe for each row?) Suppose I have the following rows

 3,7,21,9
 5,15,10,2

I need to know rank(order) of each value for each row for example and then calculate sum(value/index) For first row

21 ->4,9->3,7->3,3->1,sum(21/4,9/3,7/3,3/1)

For second row

15->4,10->3,5->2,2->1,sum(15/4,10/4,5/2,2/1)

Not A duplicate as I need the sorting not column wise but row wise

YAKOVM
  • 9,805
  • 31
  • 116
  • 217

1 Answers1

2

Assuming your input dataframe is as below

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|3   |7   |21  |9   |
|5   |15  |10  |2   |
+----+----+----+----+

Then you can write a udf function to get your desired output column as

from pyspark.sql import functions as f
from pyspark.sql import types as t
def sortAndIndex(list):
    return sorted([(value, index+1) for index, value in enumerate(sorted(list))],  reverse=True)

sortAndIndexUdf = f.udf(sortAndIndex, t.ArrayType(t.StructType([t.StructField('key', t.IntegerType(), True), t.StructField('value', t.IntegerType(), True)])))

df.withColumn('sortedAndIndexed', sortAndIndexUdf(f.array([x for x in df.columns])))

which should give you

+----+----+----+----+----------------------------------+
|col1|col2|col3|col4|sortedAndIndexed                  |
+----+----+----+----+----------------------------------+
|3   |7   |21  |9   |[[21, 4], [9, 3], [7, 2], [3, 1]] |
|5   |15  |10  |2   |[[15, 4], [10, 3], [5, 2], [2, 1]]|
+----+----+----+----+----------------------------------+

Update

You commented as

my calculation should be sum(value/index) so probably using yours udf funcrtion I should return some kind of reduce(add,)?

for that you can do

from pyspark.sql import functions as f
from pyspark.sql import types as t
def divideAndSum(list):
    return sum([float(value)/(index+1) for index, value in enumerate(sorted(list))])

divideAndSumUdf = f.udf(divideAndSum, t.DoubleType())

df.withColumn('divideAndSum', divideAndSumUdf(f.array([x for x in df.columns])))

which should give you

+----+----+----+----+------------------+
|col1|col2|col3|col4|divideAndSum      |
+----+----+----+----+------------------+
|3   |7   |21  |9   |14.75             |
|5   |15  |10  |2   |11.583333333333334|
+----+----+----+----+------------------+
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97