I have the following schema and I want to add a new column called distance. This colum compute the distance between the tow time series of each row: time_series1 and time_series2
|-- websites: struct (nullable = true)
| |-- _1: integer (nullable = false)
| |-- _2: integer (nullable = false)
|-- countryId1: integer (nullable = false)
|-- countryId2: integer (nullable = false)
|-- time_series1: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: float (nullable = false)
| | |-- _2: date (nullable = true)
|-- time_series2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: float (nullable = false)
| | |-- _2: date (nullable = true)
So I use the udf function to define this new column as:
val step2= step1
.withColumn("distance", distanceUDF(col("time_series1"),col("time_series2")))
.select("websites","countryId1","countryId2","time_series1","time_series2","distance")
and the UDF :
val distanceUDF = udf( (ts1:Seq[(Float,_)], ts2:Seq[(Float,_)])=>
compute_distance( ts1.map(_._1) , ts2.map(_._1)))
but I have the problem on the mapping, I dont know how to map the array (struct (float,date).to the scala.
Is Seq[(Float,Date)]
equivalent to array( struct (float,date)) ?
I have the following exeption:
java.lang.ClassCastException: .GenericRowWithSchema cannot be cast to scala.Tuple2
My problem is different thant the one exposed here Spark Sql UDF with complex input parameter . I have an ordered time series withe date (i have an array and not only a struct type )