0

I have the following schema and I want to add a new column called distance. This colum compute the distance between the tow time series of each row: time_series1 and time_series2

|-- websites: struct (nullable = true)
|    |-- _1: integer (nullable = false)
|    |-- _2: integer (nullable = false)
|-- countryId1: integer (nullable = false)
|-- countryId2: integer (nullable = false)
|-- time_series1: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- _1: float (nullable = false)
|    |    |-- _2: date (nullable = true)
|-- time_series2: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- _1: float (nullable = false)
|    |    |-- _2: date (nullable = true)

So I use the udf function to define this new column as:

val step2= step1
  .withColumn("distance",  distanceUDF(col("time_series1"),col("time_series2")))
  .select("websites","countryId1","countryId2","time_series1","time_series2","distance")

and the UDF :

 val distanceUDF  = udf( (ts1:Seq[(Float,_)], ts2:Seq[(Float,_)])=>
                            compute_distance( ts1.map(_._1) , ts2.map(_._1)))

but I have the problem on the mapping, I dont know how to map the array (struct (float,date).to the scala.

Is Seq[(Float,Date)] equivalent to array( struct (float,date)) ? I have the following exeption:

java.lang.ClassCastException: .GenericRowWithSchema cannot be cast to scala.Tuple2

My problem is different thant the one exposed here Spark Sql UDF with complex input parameter . I have an ordered time series withe date (i have an array and not only a struct type )

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
ZchGarinch
  • 295
  • 3
  • 13

1 Answers1

0

The link you added has the answer to your question

struct types are converted to o.a.s.sql.Row

So your function should take two Seq[Row] arguments. Then you can use the Row api to get the floats.

In cases like this you may want to use Datasets. For more about nested types you can watch The Joy of Nested Types