1

I am working with a Dataframe which has a complex schema similar to this:

 root
 |-- NPAData: struct (nullable = true)
 |    |-- NPADetails: struct (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- manager: string (nullable = true)
 |    |-- usersDetails: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- contacts: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- phone: string (nullable = true)
 |    |    |    |    |    |-- email: string (nullable = true)
 |    |    |    |    |    |-- address: string (nullable = true)
 |    |-- service: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- serviceName: string (nullable = true)
 |    |    |    |-- serviceCode: string (nullable = true) 
 |-- NPAHeader: struct (nullable = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- date: string (nullable = true)

I want to perform a map applying a custom function each Row of the DataFrame to meet the requirements:

Each row of the dataframe has 2 or more elements which have got the structure I posted in the question. First I want to separate those elements of each row in a list of rows, since I will need to compare them. One I have a DataFrame[List[Row]] I want to apply another map, so I can merge the elements of each list (for that I have a recursive function I wrote that check the order in the list and fill null fields of new elements with values of the older ones). Before I was doing all this using RDD, but I am trying to do the same with the DataFrame API

I think for that I need to pass an encoder.

Since the schema is rather complex (At least I don't know how to generate a StructType when there are Array which elements are also Arrays) what I tried was to generate the encoder by passing the schema, doing something like this:

import org.apache.spark.sql.catalyst.encoders.RowEncoder

val sourceSchema = dfSoruce.schema 

val encoder = RowEncoder(sourceSchema)

dfSoruce.map(x => x.getList[Row](0))(encoder)

But I am getting the following error:

type mismatch; found : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] required: org.apache.spark.sql.Encoder[java.util.List[org.apache.spark.sql.Row]]

How could I convert from ExpressionEncoder to Encoder?

Community
  • 1
  • 1
Ignacio Alorre
  • 7,307
  • 8
  • 57
  • 94

1 Answers1

1

I want to perform a map applying a custom function each Row of the DataFrame, but for that I need to pass an encoder.

Let me disagree.

map Operator (to be avoided)

Quoting the scaladoc of map operator:

map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] Returns a new Dataset that contains the result of applying func to each element.

You may have noticed that the encoder (in the 2nd parameter list) is an implicit parameter and as such does not have to be provided explicitly (that's the beauty of implicits in Scala, isn't it?)

My recommendation would be to do your transformation using func to an encodable type U, i.e. any type you can use in Datasets. You can find the available encoders that turn types into their encodable variants in Encoders object.

scala> :type ids
org.apache.spark.sql.Dataset[Long]

scala> ids.map(id => (id, "hello" * id.toInt)).show(truncate = false)
+---+---------------------------------------------+
|_1 |_2                                           |
+---+---------------------------------------------+
|0  |                                             |
|1  |hello                                        |
|2  |hellohello                                   |
|3  |hellohellohello                              |
|4  |hellohellohellohello                         |
|5  |hellohellohellohellohello                    |
|6  |hellohellohellohellohellohello               |
|7  |hellohellohellohellohellohellohello          |
|8  |hellohellohellohellohellohellohellohello     |
|9  |hellohellohellohellohellohellohellohellohello|
+---+---------------------------------------------+

But I'd rather spare map for more advanced transformations only after withColumn and standard functions have fallen short.

(recommended) withColumn Operator and Standard Functions

I'd rather use withColumn operator with the standard functions in functions object that would give you map-like behaviour.

Let's go over your requirements and see how far we go with the approach.

First I want to separate those elements of each row in a list of rows

For me a list of rows sounds like groupBy aggregation followed by collect_list function (possibly with some withColumn operators to extract the required values).

// leave you to fill the gaps
dfSoruce.withColumn(...).groupBy(...).agg(collect_list(...))

You don't have to think about encoders much (given they are pretty low-level and a fairly advanced concept in Spark SQL)

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • Good Morning, I was trying to follow your recommendation the max I could, but I can not apply standard functions to achieve what I need. So I needed to come back to the map, and since what I am passing is a Seq[Row] to the function inside the map, it ask for an explicit encoder during run time. But I am facing the issue of generating correctly an encoder. I posted a new question with the issue here, I leave the link in case you find some free time to check: https://stackoverflow.com/q/46525530/1773841 Thanks – Ignacio Alorre Oct 03 '17 at 07:02