I am working a custom reduceByKey over dataframe using UDAF, based in the nexts links, the objetive is to get (accumulator, count) by key.
The data is a dataframe key value:
+----+------+
|key |value |
+----+------+
| 1 | 500.0|
| 2 | 250.0|
| 3 | 350.0|
| 1 | 250.0|
| 2 | 150.0|
+----+------+
Taking some code from here: Spark Dataset aggregation similar to RDD aggregate(zero)(accum, combiner) https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/
The next code is the implementation, using two maps:
MapType(IntegerType->(DoubleType)) for the accumulator
MapType(IntegerType->(LontType)) for the counter
Now I want to store two values using only one map or any structure that can store two numbers:
1) a MapType(IntegerType->Tuple2(DoubleType,LontType)) but Tuple2 not is a sql type
2) a map with a: case class acuCount(acu:Double,count:Long) but acuCount not is a sql type
3) a ArrayType(DoubleType)
4) or any structure that can store two numbers
Then want to return a map, or if is possible return another dataframe:
+----+-------+-------+
|key | accu | count |
+----+-------+-------+
| 1 | 750.0 | 2 |
| 2 | 400.0 | 2 |
| 3 | 350.0 | 1 |
+----+-------+-------+
Next code with two maps, but is incomplete because only return one :
class GroupByAccCount extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction {
// Input Data Type Schema: key,value
def inputSchema = new org.apache.spark.sql.types.StructType().add("k", org.apache.spark.sql.types.IntegerType).add("v", org.apache.spark.sql.types.DoubleType)
// Intermediate Schema: map(key:Integer,value:Double), map(key:Integer,value:Long)
def bufferSchema: org.apache.spark.sql.types.StructType = org.apache.spark.sql.types.StructType(org.apache.spark.sql.types.StructField("values", org.apache.spark.sql.types.MapType(org.apache.spark.sql.types.IntegerType, org.apache.spark.sql.types.DoubleType))::
org.apache.spark.sql.types.StructField("values", org.apache.spark.sql.types.MapType(org.apache.spark.sql.types.IntegerType, org.apache.spark.sql.types.LongType)):: Nil)
def deterministic: Boolean = true
def initialize(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer): Unit = {
buffer(0) = Map()
buffer(1) = Map()
// buffer(1)= map(groupid count)
}
//Sequence OP
def update(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer, row: org.apache.spark.sql.Row) : Unit = {
//Row
val key = row.getAs[Int](0)
val value = row.getAs[Double](1)
//Buffer(0) Map key->Acummulator
var mpAccum = buffer.getAs[Map[Int,Double]](0)
var v:Double = mpAccum.getOrElse(key, 0.0)
v= v + value
mpAccum = mpAccum + (key -> v)
buffer(0) = mpAccum
//Buffer(1) Map key->Counter
var mpCount = buffer.getAs[Map[Int,Long]](1)
var c:Long = mpCount.getOrElse(key, 0)
mpCount = mpCount + (key -> (c + 1L))
buffer(1) = mpCount
}
//Combine Op
// Merge two partial aggregates
def merge(buffer1: org.apache.spark.sql.expressions.MutableAggregationBuffer, buffer2: org.apache.spark.sql.Row) : Unit = {
//Buffer(0) Map key->Acummulator
var mpAccum1 = buffer1.getAs[Map[Int,Double]](0)
var mpAccum2 = buffer2.getAs[Map[Int,Double]](0)
mpAccum2 foreach {
case (k ,v) => {
var c:Double = mpAccum1.getOrElse(k, 0.0)
//c = c + v
mpAccum1 = mpAccum1 + (k -> (c + v))
}
}
buffer1(0) = mpAccum1
//Buffer(1) Map key->Counter
var mpCounter1 = buffer1.getAs[Map[Int,Long]](1)
var mpCounter2 = buffer2.getAs[Map[Int,Long]](1)
mpCounter2 foreach {
case (k ,v) => {
var c:Long = mpCounter1.getOrElse(k, 0)
//c = c + v
mpCounter1 = mpCounter1 + (k -> (c + v))
}
}
buffer1(1) = mpCounter1
}
// Returned Data Type:
def dataType: org.apache.spark.sql.types.DataType = org.apache.spark.sql.types.MapType(org.apache.spark.sql.types.IntegerType, org.apache.spark.sql.types.DoubleType)//, org.apache.spark.sql.types.MapType(org.apache.spark.sql.types.IntegerType, org.apache.spark.sql.types.LongType)
def evaluate(buffer: org.apache.spark.sql.Row): Any = {
buffer.getAs[Map[Int,Double]](0)
//buffer.getAs[Map[Int,Long]](1))
//Here want to return one map : key->(acc,count) or another dataframe
}
}