0

I have a Dataframe DF with the folowing structure : DF (tag, value) and a score associated to the whole Dataframe (a Double value).

I have a function that take in parameter a Dataframe and a score, and return a new value. We can represent this function like that :

def computeNewValue(DF:DataFrame, score:Double):Double={
  // Computing that depends of 'value' and all data in 'DF'
}

Everything work fine when I want to get my 'newValue' for the whole Dataframe.

But now, I want to apply this function not for a Dataframe but for groups in a Dataframe. I want to get a 'newValue' for every tags. Let's say for example my 'score' is just a max of the value column. I would like something that I can use like that :

DF.groupBy($"tag")
  .agg(max($"value").as("score"))
  .withColumn("newValue",computeNewValue([MyTagGroup],$"score") // To change

or

DF.groupBy($"tag")
      .agg(
         max($"value").as("score"), 
         computeNewValue([MyTagGroup],$"score").as("newValue") // To change
       )

Is there a good way to achieve something like this ?

EDIT :

I tried to solve my problem using UDAF, but it does not seem to be the good way to do that. Here is more details on what I am trying to do :

Complete code for my function computeNewValue :

def computeNewValue(DF:DataFrame,score:Double):Double={
     DF.select($"*",(percent_rank over Window.orderBy("value")).alias("percentile")) 
                  .filter($"percentile">=0.05 && $"percentile"<=0.95)                  
                  .agg(min("value").as("min"),max("value").as("max")) 
                  .withColumn("newValue",(lit(score)/($"max"-$"min"))*100)
                  .select("newValue")
                  .collect()(0).getDouble(0)
}

And here is my UDAF :

class computeNewValue_udaf extends UserDefinedAggregateFunction {

  override def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", DoubleType) :: StructField("score", DoubleType) :: Nil)

  override def bufferSchema: StructType = StructType(
    StructField("min", DoubleType) ::
    StructField("max", DoubleType) :: Nil
  )

  override def dataType: DataType = DoubleType

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
     buffer(0)=0
     buffer(1)=0
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

        if(buffer(0).asInstanceOf[Int]>input(0).asInstanceOf[Int]){
          buffer(0)=input
        }

        if(buffer(1).asInstanceOf[Int]<input(0).asInstanceOf[Int]){
          buffer(1)=input
        }
        // Can't specify the 'percentile' part
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    // ?
  }

  override def evaluate(buffer: Row): Any = {
    score*100 / (buffer(1).asInstanceOf[Int]-buffer(0).asInstanceOf[Int]) 
    // Do not work because can't access 'score' value
  }
}
Nakeuh
  • 1,757
  • 3
  • 26
  • 65
  • you should either implement an UDAF, or use collect_list in your agg-statement which you process then with an UDF – Raphael Roth May 30 '18 at 13:22
  • @RaphaelRoth Should we really recommend `collect_list`? It is like saying "use groupByKey" with RDD API. – Alper t. Turker May 30 '18 at 13:23
  • @user8371915 my own tests have shown that collect_list+UDF is faster than UDAF, at least for my usecases (see e.g. https://stackoverflow.com/questions/49294294/spark-custom-aggregation-collect-listudf-vs-udaf) – Raphael Roth May 30 '18 at 13:31
  • @RaphaelRoth That's because you use `Encoder[Map[_, _]]`. It would be a better fit for `Aggregator`. Or maybe `reduceGroups`. Similar problem to [this one](https://stackoverflow.com/q/47293454/8371915) – Alper t. Turker May 30 '18 at 13:35

0 Answers0