I have a Dataframe DF with the folowing structure : DF (tag, value) and a score associated to the whole Dataframe (a Double value).
I have a function that take in parameter a Dataframe and a score, and return a new value. We can represent this function like that :
def computeNewValue(DF:DataFrame, score:Double):Double={
// Computing that depends of 'value' and all data in 'DF'
}
Everything work fine when I want to get my 'newValue' for the whole Dataframe.
But now, I want to apply this function not for a Dataframe but for groups in a Dataframe. I want to get a 'newValue' for every tags. Let's say for example my 'score' is just a max of the value column. I would like something that I can use like that :
DF.groupBy($"tag")
.agg(max($"value").as("score"))
.withColumn("newValue",computeNewValue([MyTagGroup],$"score") // To change
or
DF.groupBy($"tag")
.agg(
max($"value").as("score"),
computeNewValue([MyTagGroup],$"score").as("newValue") // To change
)
Is there a good way to achieve something like this ?
EDIT :
I tried to solve my problem using UDAF, but it does not seem to be the good way to do that. Here is more details on what I am trying to do :
Complete code for my function computeNewValue :
def computeNewValue(DF:DataFrame,score:Double):Double={
DF.select($"*",(percent_rank over Window.orderBy("value")).alias("percentile"))
.filter($"percentile">=0.05 && $"percentile"<=0.95)
.agg(min("value").as("min"),max("value").as("max"))
.withColumn("newValue",(lit(score)/($"max"-$"min"))*100)
.select("newValue")
.collect()(0).getDouble(0)
}
And here is my UDAF :
class computeNewValue_udaf extends UserDefinedAggregateFunction {
override def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", DoubleType) :: StructField("score", DoubleType) :: Nil)
override def bufferSchema: StructType = StructType(
StructField("min", DoubleType) ::
StructField("max", DoubleType) :: Nil
)
override def dataType: DataType = DoubleType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0)=0
buffer(1)=0
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if(buffer(0).asInstanceOf[Int]>input(0).asInstanceOf[Int]){
buffer(0)=input
}
if(buffer(1).asInstanceOf[Int]<input(0).asInstanceOf[Int]){
buffer(1)=input
}
// Can't specify the 'percentile' part
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
// ?
}
override def evaluate(buffer: Row): Any = {
score*100 / (buffer(1).asInstanceOf[Int]-buffer(0).asInstanceOf[Int])
// Do not work because can't access 'score' value
}
}