0

I Have 2 large data frames. Each row has lat/lon data. My goal is to do a join between 2 dataframes and find all the points which are within a distance, e.g. 100m.

df1: (id, lat, lon, geohash7)
df2: (id, lat, lon, geohash7)

I want to partition df1 and df2 on geohash7, and then only join within the partitions. I want to avoid joining between partitions to reduce computation.

df1 = df1.repartition(200, "geohash7")
df2 = df2.repartition(200, "geohash7")

df_merged = df1.join(df2, (df1("geohash7")===df2("geohash7")) & (dist(df1("lat"),df1("lon"),df2("lat"),df2("lon"))<100) )

So basically join on geohash7 and then make sure distance between points is less than 100. The problem is that, Spark actually will cross join all the data. How can I make it only do inter-partition join not intra-partition join?

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
solora
  • 93
  • 1
  • 6
  • 2
    Have you checked the plan by `.explain()`? I don't understand the code will do the `cross join`. – Lamanus Oct 03 '20 at 02:39
  • what is `dist` doing? I am correct that it is an udf calculating the euclidian distance between the two points? This would [explain](https://stackoverflow.com/a/32952938/2129801) the cross join – werner Oct 03 '20 at 13:33
  • dist calculates the haversine distance between two points. Yeah it seems that what I want is not supported by spark. After some playing with it, I think spark is not doing a cross join because geohash-7 condition is in join statement. If I remove the geohash-7 matching condition, the query runs much slower. – solora Oct 04 '20 at 03:19

1 Answers1

0

After much playing with data, it seems that spark is smart enough to first make sure a join happens on the equality condition ("geohash7"). So if there's no match there, it won't calculate the "dist" function. It also appears that with equality condition, it doesn't do cross-join anymore. So I didn't have to do anything else. The join above works fine.

solora
  • 93
  • 1
  • 6