2

I have two DataFrames df1 and df2 which have the following structures :

print(df1)
+-------+------------+-------------+---------+
|     id|      vector|  start_time | end_time|
+-------+------------+-------------+---------+
|      1| [0,0,0,0,0]|          000|      200|
|      2| [1,1,1,1,1]|          200|      500|
|      3| [0,1,0,1,0]|          100|      500|
+-------+------------+-------------+---------+

print(df2)
+-------+------------+-------+
|     id|      vector|   time|
+-------+------------+-------+
|      A| [0,1,1,1,0]|    050|
|      B| [1,0,0,1,1]|    150|
|      C| [1,1,1,1,1]|    250|
|      D| [1,0,1,0,1]|    350|
|      E| [1,1,1,1,1]|    450|
|      F| [1,0,5,0,0]|    550|
+-------+------------+-------+

What I want is : For each data of df1, get all data from df2 for which the time is between start_time and end_time and for all these data compute the euclidean distance between the two vectors.

I started with the following code, but I am stuck on the way to compute the distance :

val joined_DF = kafka_DF.crossJoin(
        hdfs_DF.withColumnRenamed("id","id2").withColumnRenamed("vector","vector2")
    )
      .filter(col("time")>= col("start_time") &&
        col("time")<= col("end_time"))
        .withColumn("distance", ???) // Euclidean distance element-wise between columns vector and column vector2

Here is the expected output on the example data:

+-------+------------+-------------+---------+-------+------------+------+----------+
|     id|      vector|  start_time | end_time|    id2|     vector2|  time| distance |
+-------+------------+-------------+---------+-------+------------+------+----------+
|      1| [0,0,0,0,0]|          000|      200|      A| [0,1,1,1,0]|   050|   1.73205|
|      1| [0,0,0,0,0]|          000|      200|      B| [1,0,0,1,1]|   150|   1.73205|
|      2| [1,1,1,1,1]|          200|      500|      C| [1,1,1,1,1]|   250|         0|
|      2| [1,1,1,1,1]|          200|      500|      D| [1,0,1,0,1]|   350|   1.41421|
|      2| [1,1,1,1,1]|          200|      500|      E| [1,1,1,1,1]|   450|         0|
|      3| [0,1,0,1,0]|          100|      500|      B| [1,0,0,1,1]|   150|   1.73205|
|      3| [0,1,0,1,0]|          100|      500|      C| [1,1,1,1,1]|   250|   1.73205|
|      3| [0,1,0,1,0]|          100|      500|      D| [1,0,1,0,1]|   350|   2.23606|
|      3| [0,1,0,1,0]|          100|      500|      E| [1,1,1,1,1]|   450|   1.73205|
+-------+------------+-------------+---------+-------+------------+------+----------+

Notes :

  • df1 will always have a little number of data, so the crossJoin will not risk to blow up my memory.
  • My dataframes have been created using the Structured Streaming API.
  • I am using Spark 2.3.2
Nakeuh
  • 1,757
  • 3
  • 26
  • 65

1 Answers1

9

A udf should work in this case.

import math._
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.linalg.Vectors

//input two vectors of length n, but must be equal length
//output euclidean distance between the vectors
val euclideanDistance = udf { (v1: Vector, v2: Vector) =>
    sqrt(Vectors.sqdist(v1, v2))
}

Utilize your new udf like this:

joined_DF.withColumn("distance", euclideanDistance($"vector", $"vector2"))
Travis Hegner
  • 2,465
  • 1
  • 12
  • 11