3

I'm using Scala + Spark 2.0 and trying to write an UDAF that has an Array of tuples as its internal buffer as well as its return type: ...

def bufferSchema = new StructType().add("midResults", ArrayType(  StructType(Array(StructField("a", DoubleType),StructField("b", DoubleType))) ))

def dataType: DataType = ArrayType(  StructType(Array(StructField( "a", DoubleType),StructField("b", DoubleType))) )

And this is how I update the buffer

def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = buffer.getAs[mutable.WrappedArray[(Double,Double)]](3) ++ Array((3.0,4.0))
}

But I get the following exception:

java.lang.ArrayStoreException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

This pattern works if I have a simple Array of Double..

zero323
  • 322,348
  • 103
  • 959
  • 935
Jeremy
  • 682
  • 2
  • 8
  • 17

1 Answers1

3

java.lang.ArrayStoreException is "thrown to indicate that an attempt has been made to store the wrong type of object into an array of objects" and this expected because a local Scala type for StructType is o.a.s.sql.Row not a tuple. In other words you should use a Seq[Row] as a buffer field and Row as a value.

Notes:

  • Calling ++ in a loop is probably not the best idea ever.
  • Creating an UDAF is slightly obsolete if you consider that since Spark 2.0 collect_list supports complex types.
  • Arguably Aggregators are way more user friendly than the UserDefinedAggregateFunctions.
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    Not sure I agree that it is trying to store a Row in this case, but I will definitely switch to collect_list and Aggregators (UDAF is a bit clunky, I agree). Thanks for the pointer. – Jeremy Sep 18 '16 at 00:31
  • A bit is an understatement :) `Aggregators` are still quite verbose but much more pleasant to write. Regarding rows you cannot have schema of type `TupleN`. Reflection here works only one way. So buffer should contain `Seq[Row]` and function should expect `Rows` as an input. – zero323 Sep 18 '16 at 00:38
  • After trying out the example I get an issue with finding the automagic Encoders. I added "import sparkSession.implicits._" Any other pointers? – Jeremy Sep 18 '16 at 00:56
  • `Aggregators`? Keep in mind that API changed between 1.6 and 2.0. I have a full 2.0 example here https://stackoverflow.com/a/32101530/1560062. Of course you should convert `DataFrame` to some statically typed Dataset first. Otherwise you'll have to deal with RowEncoders: https://stackoverflow.com/q/39433419/1560062. – zero323 Sep 18 '16 at 01:17
  • I do not understand how this solves trying to use a tuple in a buffer... – Dan Ciborowski - MSFT Jan 25 '18 at 03:31