I am working with a Dataframe which has a complex schema similar to this:
root
|-- NPAData: struct (nullable = true)
| |-- NPADetails: struct (nullable = true)
| | |-- location: string (nullable = true)
| | |-- manager: string (nullable = true)
| |-- usersDetails: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- name: string (nullable = true)
| | | |-- contacts: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- phone: string (nullable = true)
| | | | | |-- email: string (nullable = true)
| | | | | |-- address: string (nullable = true)
| |-- service: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- serviceName: string (nullable = true)
| | | |-- serviceCode: string (nullable = true)
|-- NPAHeader: struct (nullable = true)
| | |-- code: string (nullable = true)
| | |-- date: string (nullable = true)
I want to perform a map applying a custom function each Row of the DataFrame to meet the requirements:
Each row of the dataframe has 2 or more elements which have got the structure I posted in the question. First I want to separate those elements of each row in a list of rows, since I will need to compare them. One I have a DataFrame[List[Row]] I want to apply another map, so I can merge the elements of each list (for that I have a recursive function I wrote that check the order in the list and fill null fields of new elements with values of the older ones). Before I was doing all this using RDD, but I am trying to do the same with the DataFrame API
I think for that I need to pass an encoder.
Since the schema is rather complex (At least I don't know how to generate a StructType when there are Array which elements are also Arrays) what I tried was to generate the encoder by passing the schema, doing something like this:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
val sourceSchema = dfSoruce.schema
val encoder = RowEncoder(sourceSchema)
dfSoruce.map(x => x.getList[Row](0))(encoder)
But I am getting the following error:
type mismatch; found : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] required: org.apache.spark.sql.Encoder[java.util.List[org.apache.spark.sql.Row]]
How could I convert from ExpressionEncoder to Encoder?