1

I am working a custom reduceByKey over dataframe using UDAF, based in the nexts links, the objetive is to get (accumulator, count) by key.

The data is a dataframe key value: +----+------+ |key |value | +----+------+ | 1 | 500.0| | 2 | 250.0| | 3 | 350.0| | 1 | 250.0| | 2 | 150.0| +----+------+

Taking some code from here: Spark Dataset aggregation similar to RDD aggregate(zero)(accum, combiner) https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

The next code is the implementation, using two maps:

MapType(IntegerType->(DoubleType)) for the accumulator

MapType(IntegerType->(LontType)) for the counter

Now I want to store two values using only one map or any structure that can store two numbers:

1) a MapType(IntegerType->Tuple2(DoubleType,LontType)) but Tuple2 not is a sql type

2) a map with a: case class acuCount(acu:Double,count:Long) but acuCount not is a sql type

3) a ArrayType(DoubleType)

4) or any structure that can store two numbers

Then want to return a map, or if is possible return another dataframe: +----+-------+-------+ |key | accu | count | +----+-------+-------+ | 1 | 750.0 | 2 | | 2 | 400.0 | 2 | | 3 | 350.0 | 1 | +----+-------+-------+

Next code with two maps, but is incomplete because only return one :

class GroupByAccCount extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction {

 // Input Data Type Schema: key,value
  def inputSchema = new org.apache.spark.sql.types.StructType().add("k", org.apache.spark.sql.types.IntegerType).add("v", org.apache.spark.sql.types.DoubleType)

  // Intermediate Schema: map(key:Integer,value:Double), map(key:Integer,value:Long)
  def bufferSchema: org.apache.spark.sql.types.StructType = org.apache.spark.sql.types.StructType(org.apache.spark.sql.types.StructField("values",  org.apache.spark.sql.types.MapType(org.apache.spark.sql.types.IntegerType, org.apache.spark.sql.types.DoubleType))::
      org.apache.spark.sql.types.StructField("values",  org.apache.spark.sql.types.MapType(org.apache.spark.sql.types.IntegerType, org.apache.spark.sql.types.LongType)):: Nil)



  def deterministic: Boolean = true

  def initialize(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer): Unit = {
    buffer(0) = Map()
    buffer(1) = Map()
    // buffer(1)= map(groupid count)
  }

  //Sequence OP
  def update(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer, row:  org.apache.spark.sql.Row) : Unit = {
    //Row
        val key = row.getAs[Int](0)
        val value = row.getAs[Double](1)
    //Buffer(0) Map key->Acummulator
        var mpAccum = buffer.getAs[Map[Int,Double]](0)
        var v:Double = mpAccum.getOrElse(key, 0.0)
        v= v + value
        mpAccum = mpAccum  + (key -> v)
        buffer(0) = mpAccum
    //Buffer(1) Map key->Counter
        var mpCount = buffer.getAs[Map[Int,Long]](1)
        var c:Long = mpCount.getOrElse(key, 0)
        mpCount = mpCount  + (key -> (c + 1L))
        buffer(1) = mpCount


  }

  //Combine Op
  // Merge two partial aggregates
  def merge(buffer1: org.apache.spark.sql.expressions.MutableAggregationBuffer, buffer2:  org.apache.spark.sql.Row) : Unit = {
    //Buffer(0) Map key->Acummulator
    var mpAccum1 = buffer1.getAs[Map[Int,Double]](0)
    var mpAccum2 = buffer2.getAs[Map[Int,Double]](0)
    mpAccum2 foreach {
        case (k ,v) => {
            var c:Double = mpAccum1.getOrElse(k, 0.0)
            //c = c + v
            mpAccum1 = mpAccum1 + (k -> (c + v))
        }
    }
    buffer1(0) = mpAccum1
    //Buffer(1) Map key->Counter 
    var mpCounter1 = buffer1.getAs[Map[Int,Long]](1)
    var mpCounter2 = buffer2.getAs[Map[Int,Long]](1)
    mpCounter2 foreach {
        case (k ,v) => {
            var c:Long = mpCounter1.getOrElse(k, 0)
            //c = c + v
            mpCounter1 = mpCounter1 + (k -> (c + v))
        }
    }
    buffer1(1) = mpCounter1
   }


   // Returned Data Type: 
  def dataType: org.apache.spark.sql.types.DataType = org.apache.spark.sql.types.MapType(org.apache.spark.sql.types.IntegerType, org.apache.spark.sql.types.DoubleType)//, org.apache.spark.sql.types.MapType(org.apache.spark.sql.types.IntegerType, org.apache.spark.sql.types.LongType) 


  def evaluate(buffer: org.apache.spark.sql.Row): Any = {
      buffer.getAs[Map[Int,Double]](0)
      //buffer.getAs[Map[Int,Long]](1))
      //Here want to return one map : key->(acc,count) or another dataframe


  }
}
user2232395
  • 461
  • 5
  • 15

0 Answers0