0

I have a DataFrame with a structure similar to:

root
 |-- NPAData: struct (nullable = true)
 |    |-- NPADetails: struct (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- manager: string (nullable = true)
 |    |-- service: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- serviceName: string (nullable = true)
 |    |    |    |-- serviceCode: string (nullable = true) 
 |-- NPAHeader: struct (nullable = true)
 |    |    |-- npaNumber: string (nullable = true)
 |    |    |-- date: string (nullable = true)

What I am trying is to:

  • Group the records which has got the same npaNumber into a list
  • Inside each list, order the elements depending on their date
  • Once I have the elements grouped and ordered, I need merge the elements applying some logic. To perform this list step I decided to use a map.

Here is what I tried so far:

val toUpdate = sourceDF.withColumn("count", count($"NPAHeader").over(Window.partitionBy("NPAHeader.npaNumber").orderBy($"NPAHeader.date".desc))).filter($"count" > 1)

val groupedNpa = toUpdate.groupBy($"NPAHeader.npaNumber" ).agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))

//This is a simply version of my logic.                                                                                                 
def pickOne(List: Seq[Row]): Row = {
      println("First element: "+List.get(0))
      List.get(0)
}

val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa")))) 

An example of a Row after the groupBy would be:

[1234,WrappedArray([npaNew,npaOlder,...npaOldest])]

But I am getting an exception when I try to invoke the function from the map.

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row - field (class: "org.apache.spark.sql.Row", name: "_2") - root class: "scala.Tuple2"

What I understand is I can not invoke the function pickOne() from the map (Or at least not in the way I am trying it). But I don't know what am I doing wrong.

Why am I having that exception?

Thanks for your time!

Note: I know there are easier ways to pick up one element from the list without invoking the custom function. But I need to invoke it yes or yes, because in the next step I need to place there a far more complex logic to merge rows.

After using Mahesh Chand Kandpal suggestion:

import org.apache.spark.sql.catalyst.encoders.RowEncoder

grouped.map(row => "emdNumber: "+row.getAs[String]("emdNumber"))
val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa"))(RowEncoder(row.schema)))) 

I get the following error:

type mismatch; found : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] required: Int

How should I apply the Encoder instead?

Ignacio Alorre
  • 7,307
  • 8
  • 57
  • 94

1 Answers1

2

When you use map with dataframe, you need to give encoder.

In spark 2.x Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

import org.apache.spark.sql.catalyst.encoders.RowEncoder
implicit val encoder = RowEncoder(schema)
Mahesh Chand
  • 3,158
  • 19
  • 37
  • thanks for the answer but appears a new error, check the edit – Ignacio Alorre Oct 01 '17 at 15:55
  • I think you should give encoder outside the map over groupedNpa. implicit val encoder = RowEncoder(schema) Give schema of your dataframe. – Mahesh Chand Oct 01 '17 at 16:06
  • same result, how should I pass the map? – Ignacio Alorre Oct 01 '17 at 18:12
  • if you could show groupedNpa. I can give try on my shell. – Mahesh Chand Oct 02 '17 at 04:09
  • It's a toy example. The real one I have is far more complex. But, could you add a line in your answer showing how you would invoke the custom function using the encoder? – Ignacio Alorre Oct 02 '17 at 10:03
  • show the groupedNpa.. Actually it will give idea. And output what exactly do you want. – Mahesh Chand Oct 02 '17 at 10:14
  • I have asked with a different approach something similar to this question: https://stackoverflow.com/q/46525530/1773841, using a simpler example which may be easier to understand. I would appreciate if you move your answer to that other question so I can close this one, for being almost the same and far worse formulated and harder to understand. I will upvote your answers there as well. Thanks – Ignacio Alorre Oct 03 '17 at 10:05
  • Thanks for letting me know. I will look at it when i get time. – Mahesh Chand Oct 03 '17 at 11:34