1

I have data as below,

n1  d1  un1 mt1 1
n1  d1  un1 mt2 2
n1  d1  un1 mt3 3
n1  d1  un1 mt4 4
n1  d2  un1 mt1 3
n1  d2  un1 mt3 3
n1  d2  un1 mt4 4
n1  d2  un1 mt5 6
n1  d2  un1 mt2 3

Ii want to get the output as below

n1 d1 un1 0.75
n1 d2 un1 1.5

i,e do a groupby on 1st, 2nd and 3rd column and for 4th column follow the below formula, 4th column = within the group, (mt1+mt2)/mt4

I am trying to do the same with Spark DF assuming data is in dataframe a with column name as n,d,un,mt,r I am trying this.

sqlContext.udf.register("aggUDF",(v:List(mt,r))=> ?)
val b = a.groupBy("n","d","un").agg(callUdf("aggUDF",List((mt,r)) should go here))
zero323
  • 322,348
  • 103
  • 959
  • 935
Akash
  • 355
  • 4
  • 11
  • 1
    Possible duplicate of [How can I define and use a User-Defined Aggregate Function in Spark SQL?](http://stackoverflow.com/questions/32100973/how-can-i-define-and-use-a-user-defined-aggregate-function-in-spark-sql) – zero323 Dec 01 '15 at 18:09

2 Answers2

4

If I understand correctly, you first want to calculate the sum of rows with mt1 and mt2 and divide by the sum of those in mt4 for each distinct n1,d1, un1.

While it is possible to work with custom aggregation functions as answered above, you can also use a little brute force (I will be showing it in pyspark but you should be able to convert to scala easily).

Assume your original dataframe is called df and the columns are in order: n,d,un,mt,r

First create a new column for each of mt1, mt2 and mt4 as follows:

from pyspark.sql import functions as F
newdf = df.withColumn("mt1", when(df.mt == "mt1", df.r).otherwise(0).alias("mt1"))
newdf = newdf .withColumn("mt2", when(newdf.mt == "mt2", newdf .r).otherwise(0).alias("mt2"))
newdf = newdf .withColumn("mt4", when(newdf.mt == "mt4", newdf .r).otherwise(0).alias("mt4"))

Now do the group by of the first 3 values and as aggregation do a sum of the new 3 values.

aggregated = newdf.groupBy(["n","d","n"]).agg(F.sum(newdf.mt1).alias("sum_mt1"),F.sum(newdf.mt2).alias("sum_mt2"), F.sum(newdf.mt4).alias("sum_mt4"))

now just do the calculation:

final = aggregated.withColumn("res", (aggregated.sum_mt1 +  aggregated.sum_mt2) / aggregated.sum_mt4)    

Not the most elegant solution but it might work for you...

Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
0

At this time (Spark 1.4) there is no support for custom aggregation functions. You can however use Hive UDAFs. You can see an example of a Hive user-defined aggregation function (UDAF) in Spark here.

karel
  • 5,489
  • 46
  • 45
  • 50