I get some data from my mongodb that looks like this:
+------+-------+
| view | data |
+------+-------+
| xx | *** |
| yy | *** |
| xx | *** |
+------+-------+
It's not really necessary to know what is inside.
I wrote an UserDefinedAggregateFunction like this because I want to group on view.:
class Extractor() extends UserDefinedAggregateFunction{
override def inputSchema: StructType = // some stuff
override def bufferSchema: StructType =
StructType(
List(
StructField("0",IntegerType,false),
StructField("1",IntegerType,false),
StructField("2",IntegerType,false),
StructField("3",IntegerType,false),
StructField("4",IntegerType,false),
StructField("5",IntegerType,false),
StructField("6",IntegerType,false),
StructField("7",IntegerType,false)
)
)
override def dataType: DataType = bufferSchema
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
for (x <- 0 to 7){
buffer(x) = 0
}
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = // some stuff
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = // some stuff
override def evaluate(buffer: Row): Any =
var l = List.empty[Integer]
for (x <- 7 to 0 by -1){
l = buffer.getInt(x) :: l
}
l
}
My output should be something like this:
+------+---+---+---+---+---+---+---+---+
| view | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+------+---+---+---+---+---+---+---+---+
| xx | 0 | 0 | 4 | 1 | 0 | 0 | 3 | 0 |
| yy | 0 | 0 | 0 | 3 | 0 | 1 | 0 | 0 |
+------+---+---+---+---+---+---+---+---+
The values are calculated in the update/merge function above but that works and it's not necessary to let you see it.
Then I use it like this:
val ex = new Extractor()
val df = dataset.groupBy("view").agg(
ex(dataset.col("data"))
)
df.show()
When I execute df.show() it always give me an IndexOutOfBoundException. I know that it's lazy evaluation that's why I get an error in df.show().
As far as I can see it can execute the first group and end the evaluate function. But after that I get an IndexOutOfBoundException...
Also when I change the dataType and evaluate Function to:
override def dataType: DataType =
ArrayType(IntegerType,false)
override def evaluate(buffer: Row): Any = {
var l = ofDim[Integer](8)
for (x <- 0 to 7){
l(x) = buffer.getInt(x)
}
l
The output would look like this:
+------+------------------------------+
| view | Extractor |
+------+------------------------------+
| xx | [0, 0, 4, 1, 0, 0, 3, 0] |
| yy | [0, 0, 0, 3, 0, 1, 0, 0] |
+------+------------------------------+
And the schema looks like this:
root
|-- view: string (nullable = true)
|-- Extractor: array (nullable = true)
| |-- element: integer (containsNull = false)
And I wasn't able to convert this in the form I want.
Because the 2nd approach works I think I mess in the first approach something with the DataType but I don't get how I can fix it...
Many introduction so to my question:
How can I get the output I want? I don't really care which of the both approach( first with multiple output columns or an array which can conver to the form I want) as long as it's efficient.
Thank you for help