I have a list of origins and destinations along with their geo coordinates. I need to calculate the minimum distance for each origin to the destinations.
Below is my code:
import pyspark.sql.functions as F
from haversine import haversine_vector, Unit
data1 = [("A", (45.7597, 4.8422)), ("B", (46.7431, 5.8422))]
columns1 = ["Origin", "Origin_Geo"]
df1 = spark.createDataFrame(data=data1, schema=columns1)
data2 = [("Destin1", (48.8567, 2.3508)), ("Destin2", (40.7033962, -74.2351462))]
columns2 = ["Destination", "Destination_Geo"]
df2 = spark.createDataFrame(data=data2, schema=columns2)
df = df1.crossJoin(df2)
df.withColumn(
"Distance", haversine_vector(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()
I got error like below:
IndexError: too many indices for array: array is 0-dimensional, but 2 were indexed
my question is:
it seems that there is something wrong with
withColumn('Distance', haversine_vector(F.col('Origin_Geo'), F.col('Destination_Geo')))
. I do not know why. (I'm new to pyspark..)I have a long list of origins and destinations (both over 30K). Cross join generate numerous combinations of origins and destinations. I wonder if there is any more efficient way to get the min distance?
Thanks a lot in advance.