0

I am working with DataFrames which elements have got a schema similar to:

root
 |-- NPAData: struct (nullable = true)
 |    |-- NPADetails: struct (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- manager: string (nullable = true)
 |    |-- service: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- serviceName: string (nullable = true)
 |    |    |    |-- serviceCode: string (nullable = true) 
 |-- NPAHeader: struct (nullable = true)
 |    |    |-- npaNumber: string (nullable = true)
 |    |    |-- date: string (nullable = true)

In my DataFrame I want to group all elements which has the same NPAHeader.code, so to do that I am using the following line:

val groupedNpa = orderedNpa.groupBy($"NPAHeader.code" ).agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))

After this I have a dataframe with the following schema:

StructType(StructField(npaNumber,StringType,true), StructField(npa,ArrayType(StructType(StructField(NPAData...)))))

An example of each Row would be something similar to:

[1234,WrappedArray([npaNew,npaOlder,...npaOldest])]

Now what I want is to generate another DataFrame with picks up just one of the element in the WrappedArray, so I want an output similar to:

[1234,npaNew]

Note: The chosen element from the WrappedArray is the one that matches a complext logic after iterating over the whole WrappedArray. But to simplify the question, I will pick up always the last element of the WrappedArray (after iterating all over it).

To do so, I want to define a recurside udf

import org.apache.spark.sql.functions.udf

def returnRow(elementList : Row)(index:Int): Row = {
  val dif = elementList.size - index
  val row :Row = dif match{
    case 0 => elementList.getAs[Row](index)
    case _ => returnRow(elementList)(index + 1) 
  }
  row
} 

val returnRow_udf = udf(returnRow _)


groupedNpa.map{row => (row.getAs[String]("npaNumber"),returnRow_udf(groupedNpa("npa")(0)))}

But I am getting the following error in the map:

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Int => Unit is not supported

What am I doing wrong?

As an aside, I am not sure if I am passing correctly the npa column with groupedNpa("npa"). I am accesing the WrappedArray as a Row, because I don't know how to iterate over Array[Row] (the get(index) method is not present in Array[Row])

zero323
  • 322,348
  • 103
  • 959
  • 935
Ignacio Alorre
  • 7,307
  • 8
  • 57
  • 94

1 Answers1

1

TL;DR Just use one of the methods described in How to select the first row of each group?

If you want to use complex logic, and return Row you can skip SQL API and use groupByKey:

val f: (String, Iterator[org.apache.spark.sql.Row]) => Row
val encoder: Encoder 
df.groupByKey(_.getAs[String]("NPAHeader.code")).mapGroups(f)(encoder)

or better:

val g: (Row, Row) => Row

df.groupByKey(_.getAs[String]("NPAHeader.code")).reduceGroups(g)

where encoder is a valid RowEncoder (Encoder error while trying to map dataframe row to updated row).

Your code is faulty in multiple ways:

  • groupBy doesn't guarantee the order of values. So:

    orderBy(...).groupBy(....).agg(collect_list(...))
    

    can have non-deterministic output. If you really decide to go this route you should skip orderBy and sort collected array explicitly.

  • You cannot pass curried function to udf. You'd have to uncurry it first, but it would require different order of arguments (see example below).

  • If you could, this might be the correct way to call it (Note that you omit the second argument):

    returnRow_udf(groupedNpa("npa")(0))
    

    To make it worse, you call it inside map, where udfs are not applicable at all.

  • udf cannot return Row. It has to return external Scala type.

  • External representation for array<struct> is Seq[Row]. You cannot just substitute it with Row.
  • SQL arrays can be accessed by index with apply:

    df.select($"array"(size($"array") - 1))
    

    but it is not a correct method due to non-determinism. You could apply sort_array, but as pointed out at the beginning, there are more efficient solutions.

  • Surprisingly recursion is not so relevant. You could design function like this:

    def size(i: Int=0)(xs: Seq[Any]): Int = xs match {
      case Seq() => i
      case null => i
      case Seq(h, t @ _*) => size(i + 1)(t)
    }
    
    val size_ = udf(size() _)
    

    and it would work just fine:

    Seq((1, Seq("a", "b", "c"))).toDF("id", "array")
      .select(size_($"array"))
    

    although recursion is an overkill, if you can just iterate over Seq.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • I tried change my code as much as possible following your instructions, but I was forced to move up to some of my original approaches, because the server where I need to run this has Spark 1.6, and as I understand groupByKey, mapGroups and reduceGroups, which were going to easy my life a lot, can't be used in that version. – Ignacio Alorre Sep 28 '17 at 06:20
  • Here is the new one in case you want to checl. https://stackoverflow.com/q/46463931/1773841 I did several updates, that is why I prefer to ask a different question than updating here again and again. I add the partitionBy and orderBy inside a Window() to avoid the issue you pointed out. Instead of udf I use a "normal" function that hopefully I will be able to invoke from a map so I will not have the restrictions in the return type. I know this things can be done with RDD, but I am not so optimist about DF. – Ignacio Alorre Sep 28 '17 at 08:05