0

I use pyspark and I have created (from txt files) two dataframes

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import pandas as pd
sc = spark.sparkContext
+---+--------------------+------------------+-------------------+
| id|                name|               lat|                lon|
+---+--------------------+------------------+-------------------+
|  1|.
.
.
+---+-------------------+------------------+-------------------+
| id|               name|               lat|                lon|
+---+-------------------+------------------+-------------------+
|  1||
.
.

What I want is, through Spark techniques, to get every pair between the items of the Dataframes where their euclidean distance is below a certain value (let's say "0.5"). Like:

record1, record2

or in any form like this, this is not the matter.

Any help will be appreciated, thank you.

Ian_Lane
  • 65
  • 1
  • 7

1 Answers1

0

Since Spark does not include any provisions for geospatial computations, you need a user-defined function that computes the geospatial distance between two points, for example by using the haversine formula (from here):

from math import radians, cos, sin, asin, sqrt
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

@udf(returnType=FloatType())
def haversine(lat1, lon1, lat2, lon2):
    R = 6372.8

    dLat = radians(lat2 - lat1)
    dLon = radians(lon2 - lon1)
    lat1 = radians(lat1)
    lat2 = radians(lat2)

    a = sin(dLat/2)**2 + cos(lat1)*cos(lat2)*sin(dLon/2)**2
    c = 2*asin(sqrt(a))

    return R * c

Then you simply perform a cross join conditioned on the result from calling haversine():

df1.join(df2, haversine(df1.lat, df1.lon, df2.lat, df2.lon) < 100, 'cross') \
   .select(df1.name, df2.name)

You need a cross join since Spark cannot embed the Python UDF in the join itself. That's expensive, but this is something that PySpark users have to live with.

Here is an example:

>>> df1.show()
+---------+-------------------+--------------------+
|      lat|                lon|                name|
+---------+-------------------+--------------------+
|37.776181|-122.41341399999999|AAE SSFF European...|
|38.959716|        -119.945595|Ambassador Motor ...|
| 37.66169|        -121.887367|Alameda County Fa...|
+---------+-------------------+--------------------+
>>> df2.show()
+------------------+-------------------+-------------------+
|               lat|                lon|               name|
+------------------+-------------------+-------------------+
|       34.19198813|-118.93756299999998|Daphnes Greek Cafe1|
|         37.755557|-122.25036084651899|Daphnes Greek Cafe2|
|38.423435999999995|         -121.41361|       Laguna Pizza|
+------------------+-------------------+-------------------+
>>> df1.join(df2, haversine(df1.lat, df1.lon, df2.lat, df2.lon) < 100, 'cross') \
       .select(df1.name.alias("name1"), df2.name.alias("name2")).show()
+--------------------+-------------------+
|               name1|              name2|
+--------------------+-------------------+
|AAE SSFF European...|Daphnes Greek Cafe2|
|Alameda County Fa...|Daphnes Greek Cafe2|
|Alameda County Fa...|       Laguna Pizza|
+--------------------+-------------------+
Hristo Iliev
  • 72,659
  • 12
  • 135
  • 186
  • You mention "this is something that PySpark users have to live with". Does this apply to every Spark implementation or only in Pyspark? Is there a way, using Spark, to achieve a join of two datasets without the "expensive" cross join? Maybe if we unite the two datasets in one, marking every row with the set they come from? (eg 'A', 'B'). And then somehow iterate? – Ian_Lane Mar 30 '20 at 12:53
  • 1
    @Ian_Lane, what is particular about PySpark (and I guess the R API too) is that Python is not a JVM language. The Python code lives in a separate process and talks to the JVM hosting the Spark executor via network sockets. For anything that is outside what the Scala/Java API provides, Spark has to serialise the data, send it to Python where it is deserialised, iterated over, then serialised again, sent back, and deserialised. Contrast that with when the UDF is written in Scala or Java and calls to it get compiled directly in the inner loop by Catalyst's code generator. – Hristo Iliev Mar 30 '20 at 13:01
  • I get it. So if we move inside Scala, can we solve the same problem using a faster approach? – Ian_Lane Mar 30 '20 at 13:41
  • I don't see a faster approach since you have to test all possible pairs, but having the distance UDF written in Scala or Java will allow Spark to call it deep in the join loop. You can also speed up the distance calculation by precomputing a bounding box around each object that is so big that any object outside of it is guaranteed not to be within the cut-off distance and first check if the coordinates of the second object are within the box. This is very simple math and if the object is outside, it will save you computing the trig functions. – Hristo Iliev Mar 30 '20 at 16:12