2

I use Spark 2.0.1 and Scala 2.11.

This is a question related to user-defined aggregate function (UDAF) in Spark. I'm using the example answer provided here to ask my question:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{Row, Column}

object DummyUDAF extends UserDefinedAggregateFunction {
  def inputSchema = new StructType().add("x", StringType)
  def bufferSchema = new StructType()
    .add("buff", ArrayType(LongType))
    .add("buff2", ArrayType(DoubleType))
  def dataType = new StructType()
    .add("xs", ArrayType(LongType))
    .add("ys", ArrayType(DoubleType))
  def deterministic = true 
  def initialize(buffer: MutableAggregationBuffer) = {}
  def update(buffer: MutableAggregationBuffer, input: Row) = {}
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {}
  def evaluate(buffer: Row) = (Array(1L, 2L, 3L), Array(1.0, 2.0, 3.0))
}

I'm able to return multiple Maps instead of an Array easily, but not able to mutate the map in the update method.

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{Row, Column}

import scala.collection.mutable.Map

object DummyUDAF extends UserDefinedAggregateFunction {
  def inputSchema = new StructType().add("x", DoubleType).add("y", IntegerType)
  def bufferSchema = new StructType()
    .add("buff", MapType(DoubleType, IntegerType))
    .add("buff2", MapType(DoubleType, IntegerType))

  def dataType = new StructType()
    .add("xs", MapType(DoubleType, IntegerType))
    .add("ys", MapType(DoubleType, IntegerType))

  def deterministic = true 

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = scala.collection.mutable.Map[Double,Int]()
    buffer(1) = scala.collection.mutable.Map[Double,Int]()
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0).asInstanceOf[Map[Double,Int]](input.getDouble(0)) = input.getInt(1)
    buffer(1).asInstanceOf[Map[Double,Int]](input.getDouble(0)*10) = input.getInt(1)*10
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(0).asInstanceOf[Map[Double,Int]] ++= buffer2(0).asInstanceOf[Map[Double,Int]]
    buffer1(1).asInstanceOf[Map[Double,Int]] ++= buffer2(1).asInstanceOf[Map[Double,Int]]
  }

  //def evaluate(buffer: Row) = (Map(1.0->10,2.0->20), Map(10.0->100,11.0->110))
  def evaluate(buffer: Row) = (buffer(0).asInstanceOf[Map[Double,Int]], buffer(1).asInstanceOf[Map[Double,Int]])
}

This compiles fine, but gives a runtime error:

val df = Seq((1.0, 1), (2.0, 2)).toDF("k", "v")
df.select(DummyUDAF($"k", $"v")).show(1, false)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 70.0 failed 4 times, most recent failure: Lost task 1.3 in stage 70.0 (TID 204, 10.91.252.25): java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to scala.collection.mutable.Map

Another solution discussed here indicates that this could be a problem due to MapType StructType. However, when I try out the solution mentioned I still get the same error.

val distudaf = new DistinctValues
val df = Seq(("a", "a1"), ("a", "a1"), ("a", "a2"), ("b", "b1"), ("b", "b2"), ("b", "b3"), ("b", "b1"), ("b", "b1")).toDF("col1", "col2")

df.groupBy("col1").agg(distudaf($"col2").as("DV")).show

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 22.0 failed 4 times, most recent failure: Lost task 1.3 in stage 22.0 (TID 100, 10.91.252.25): java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to scala.collection.mutable.Map

My preference would be to mutate the Map, given that I expect the Map to be huge, and making a copy and re-assigning may be lead to performance/memory bottlenecks)

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
mrbrahman
  • 455
  • 5
  • 18

2 Answers2

5

My limited understanding of UDAF is that you should only set what you want to be (semantically) updated, i.e. take what is set already in MutableAggregationBuffer, combine with what you want to add and...= it (which will call update(i: Int, value: Any): Unit under the covers)

Your code could look as follows:

def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  val newBuffer0 = buffer(0).asInstanceOf[Map[Double, Int]]
  buffer(0) = newBuffer0 + (input.getDouble(0) -> input.getInt(1))

  val newBuffer1 = buffer(1).asInstanceOf[Map[Double, Int]]
  buffer(1) = newBuffer1 + (input.getDouble(0) * 10 -> input.getInt(1) * 10)
}

The complete DummyUDAF could be as follows:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{Row, Column}

object DummyUDAF extends UserDefinedAggregateFunction {
  def inputSchema = new StructType().add("x", DoubleType).add("y", IntegerType)
  def bufferSchema = new StructType()
    .add("buff", MapType(DoubleType, IntegerType))
    .add("buff2", MapType(DoubleType, IntegerType))

  def dataType = new StructType()
    .add("xs", MapType(DoubleType, IntegerType))
    .add("ys", MapType(DoubleType, IntegerType))

  def deterministic = true 

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = Map[Double,Int]()
    buffer(1) = Map[Double,Int]()
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val newBuffer0 = buffer(0).asInstanceOf[Map[Double, Int]]
    buffer(0) = newBuffer0 + (input.getDouble(0) -> input.getInt(1))

    val newBuffer1 = buffer(1).asInstanceOf[Map[Double, Int]]
    buffer(1) = newBuffer1 + (input.getDouble(0) * 10 -> input.getInt(1) * 10)
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(0) = buffer1(0).asInstanceOf[Map[Double,Int]] ++ buffer2(0).asInstanceOf[Map[Double,Int]]
    buffer1(1) = buffer1(1).asInstanceOf[Map[Double,Int]] ++ buffer2(1).asInstanceOf[Map[Double,Int]]
  }

  //def evaluate(buffer: Row) = (Map(1.0->10,2.0->20), Map(10.0->100,11.0->110))
  def evaluate(buffer: Row) = (buffer(0).asInstanceOf[Map[Double,Int]], buffer(1).asInstanceOf[Map[Double,Int]])
}
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • 1
    It is worth pointing out that this copies data twice on each call of `update` so doesn't really address the problem. – zero323 Jan 28 '18 at 02:08
1

Late for the party. I just discovered that one can use

override def bufferSchema: StructType = StructType(List(
    StructField("map", ObjectType(classOf[mutable.Map[String, Long]]))
))

to use mutable.Map in a buffer.

colinfang
  • 20,909
  • 19
  • 90
  • 173
  • 1
    Can you please post a full example? I did `import scala.collection.mutable` and tried your solution. However, got an error at *runtime*: `org.apache.spark.SparkException: Unsupported data type ObjectType(interface scala.collection.mutable.Map)` – mrbrahman Dec 23 '19 at 22:01