3

I get some data from my mongodb that looks like this:

     +------+-------+
     | view | data  |
     +------+-------+
     |  xx  | ***   |
     |  yy  | ***   |
     |  xx  | ***   |
     +------+-------+

It's not really necessary to know what is inside.

I wrote an UserDefinedAggregateFunction like this because I want to group on view.:

class Extractor() extends UserDefinedAggregateFunction{
  override def inputSchema: StructType = // some stuff

  override def bufferSchema: StructType = 
      StructType(
        List(
          StructField("0",IntegerType,false),
          StructField("1",IntegerType,false),
          StructField("2",IntegerType,false),
          StructField("3",IntegerType,false),
          StructField("4",IntegerType,false),
          StructField("5",IntegerType,false),
          StructField("6",IntegerType,false),
          StructField("7",IntegerType,false)
        )
      )

  override def dataType: DataType = bufferSchema        

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    for (x <- 0 to 7){
      buffer(x) = 0
    }
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = // some stuff

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = // some stuff

  override def evaluate(buffer: Row): Any = 
      var l = List.empty[Integer]
      for (x <- 7 to 0 by -1){
          l = buffer.getInt(x) :: l
      }
      l
}

My output should be something like this:

     +------+---+---+---+---+---+---+---+---+
     | view | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
     +------+---+---+---+---+---+---+---+---+
     |  xx  | 0 | 0 | 4 | 1 | 0 | 0 | 3 | 0 |
     |  yy  | 0 | 0 | 0 | 3 | 0 | 1 | 0 | 0 |
     +------+---+---+---+---+---+---+---+---+

The values are calculated in the update/merge function above but that works and it's not necessary to let you see it.

Then I use it like this:

val ex = new Extractor()
val df = dataset.groupBy("view").agg(
      ex(dataset.col("data"))
)
df.show()

When I execute df.show() it always give me an IndexOutOfBoundException. I know that it's lazy evaluation that's why I get an error in df.show().

As far as I can see it can execute the first group and end the evaluate function. But after that I get an IndexOutOfBoundException...

Also when I change the dataType and evaluate Function to:

override def dataType: DataType =
    ArrayType(IntegerType,false)

override def evaluate(buffer: Row): Any = {
    var l = ofDim[Integer](8)
    for (x <- 0 to 7){
      l(x) = buffer.getInt(x)
    }
    l

The output would look like this:

     +------+------------------------------+
     | view | Extractor                    |
     +------+------------------------------+
     |  xx  | [0, 0, 4, 1, 0, 0, 3, 0]     |
     |  yy  | [0, 0, 0, 3, 0, 1, 0, 0]     |
     +------+------------------------------+

And the schema looks like this:

root
 |-- view: string (nullable = true)
 |-- Extractor: array (nullable = true)
 |    |-- element: integer (containsNull = false)

And I wasn't able to convert this in the form I want.

Because the 2nd approach works I think I mess in the first approach something with the DataType but I don't get how I can fix it...

Many introduction so to my question:

How can I get the output I want? I don't really care which of the both approach( first with multiple output columns or an array which can conver to the form I want) as long as it's efficient.

Thank you for help

Boendal
  • 2,496
  • 1
  • 23
  • 36

1 Answers1

0

You are defining the aggregated output to be a list :

 override def dataType: DataType = bufferSchema

Because bufferSchema is a List, this is what you get in the end. You can alter later your schema and transform each column from your list into a new column.

For your error, the difference between :

override def evaluate(buffer: Row): Any = 
  var l = List.empty[Integer]
  for (x <- 7 to 0 by -1){
      l = buffer.getInt(x) :: l
  }
  l

and

override def evaluate(buffer: Row): Any = 
  var l = ofDim[Integer](8)
  for (x <- 0 to 7){
    l = buffer.getInt(x) :: l
  }
  l

is that in the second one, you define a predefined number of columns. Therefore, you are certain you can iterate from 0 to 7 without problem.

It's not the case for your first example, therefore, I suspect that you may have a misformatted data that makes your buffer to be wrongly initialized in initialize or merge. I'll suggesst you add a try/catch to validate the size after each step that transform the length of your buffer (at least initialize but could be update or merge too).

To add column for each element from your list, you can either use withColumn or do it through a map.

glefait
  • 1,651
  • 1
  • 13
  • 11
  • I don't think that's the case. It's always 0 (I added the initialize method so you can see that I'm sure that's always 8) and about: "You can alter later your schema and transform each column from your list into a new column." when I know how I would do it – Boendal Mar 12 '17 at 16:24
  • What about the merge function. Are you checking that the row is not null ? – glefait Mar 12 '17 at 16:28
  • it's impossible that they are null. and if they are null then the second approach wouldn't work. – Boendal Mar 12 '17 at 16:29