-1
coordinates = df0.registerTempTable('coordinates')  #5555 rows
metro_table = df1.registerTempTable('metro_table')  #272 rows

I'm trying to find the closest points to the metro station for that I implement this SQL query but I don't understand why Spark does not accept the INNER JOIN !!! And my second question is how to return the first row only !! I used fetch first 1 rows only but it gives me an indentation error

query = "SELECT uuid,\
            latitude,\
            longitude,\
            p.station_id,\
            p.xlat,\
            p.xlong,\
            p.type_train,\
            p.id_transport,\
              6371000* DEGREES(ACOS(COS(RADIANS(p.xlat))\
                     * COS(RADIANS(latitude))\
                     * COS(RADIANS(p.xlong) - RADIANS(longitude))\
                     + SIN(RADIANS(p.xlat))\
                     * SIN(RADIANS(latitude)))) AS distance_in_meters\
     FROM coordinates\
     CROSS JOIN (\
         SELECT id AS id_transport,\
                station_id,\
                xlat,\
                xlong,\
                type_train\
         FROM metro_table\
         fetch first 1 rows only\   # Doesn't work in Spark
        ) AS p ON 1=1\
     ORDER BY distance_in_meters"

# Run query
df = sqlContext.sql(query)   #1510960 rows

Using Pyspark (fetch first 1 rows only)

w = Window.partitionBy(['uuid', 'latitude', 'longitude']).orderBy('distance_in_meters')
df.select('uuid', 'latitude', 'longitude', xlat, xlong, F.min('distance_in_meters').over(w)).count()  #1510960 rows

enter image description here

Merto
 ---+----------+----------------+----------+----------+
|id |xlong     |xlat            |station_id|type_train|
+---+----------+----------------+----------+----------+
|1  |-73.668172|45.5552769999931|1         |métro     |
|2  |-73.668486|45.5542469999931|2         |métro     |
|3  |-73.668225|45.5556069999931|3         |métro     |
|4  |-73.667407|45.5561219999931|4         |métro     |
+---+----------+----------------+----------+----------+

Coordinates
+-----+---------+----------+
|uuid | latitude| longitude|
+-----+---------+----------+
|1009 | 45.53175| -73.62613|
|1009 | 45.53163| -73.62546|
+-----+---------+----------+

After CROSS JOIN
 +----+--------+---------+----------+----------------+----------+----------+-- ----------------+
 |uuid|latitude|longitude|station_id|            xlat|     xlong|type_train|distance_in_meters|
 +----+--------+---------+----------+----------------+----------+----------+------------------+
 |1009|45.53175|-73.62613|         2|45.5542469999931|-73.668486|     metro|237197.13838255248|
 |1009|45.53163|-73.62546|         2|45.5542469999931|-73.668486|     metro|240044.33000560844|
 |1009|45.53175|-73.62613|         1|45.5552769999931|-73.668172|     metro| 240121.5093484111|
 |1009|45.53175|-73.62613|         4|45.5561219999931|-73.667407|     metro|240897.59082511123|
 |1009|45.53175|-73.62613|         3|45.5556069999931|-73.668225|     metro|241622.85492502493|
 |1009|45.53163|-73.62546|         1|45.5552769999931|-73.668172|     metro|242937.79388593792|
 |1009|45.53163|-73.62546|         4|45.5561219999931|-73.667407|     metro| 243679.8807249287|
 |1009|45.53163|-73.62546|         3|45.5556069999931|-73.668225|     metro| 244431.2963545028|
 +----+--------+---------+----------+----------------+----------+----------+------------------+

Desirable results
+----+--------+---------+----------+----------------+----------+----------+------------------+
|uuid|latitude|longitude|station_id|            xlat|     xlong|type_train|distance_in_meters|
+----+--------+---------+----------+----------------+----------+----------+------------------+
|1009|45.53175|-73.62613|         2|45.5542469999931|-73.668486|     metro|237197.13838255248|
|1009|45.53163|-73.62546|         2|45.5542469999931|-73.668486|     metro|240044.33000560844|
adil blanco
  • 335
  • 1
  • 4
  • 14
  • 1
    Spark is not PostgreSQL. I'd say it is a duplicate of [Find maximum row per group in Spark DataFrame](https://stackoverflow.com/q/35218882/6910411), but there are of course more efficient approximation (if distances are small enough). – zero323 Apr 19 '18 at 16:36
  • thank you for your answer but we can pass SQL queries with sqlContext (). But I don't understand why you refer me to this link, it's not what I'm looking for – adil blanco Apr 19 '18 at 16:45
  • `"fetch first 1 rows only"` is not valid spark-sql syntax (Like the previous commented noted, Spark is not Postgres). You can use `LIMIT 1` which would be equivalent but I am not sure if that will give you the correct answer (even in Postgres). Without an ordering, you will just get an arbitrary record, though sometimes you may get the correct answer by luck. – pault Apr 19 '18 at 18:40
  • Furthermore in your sample spark select statement, why would you expect anything less than the number of rows in your dataframe as the result? You need to be using a `join` or `crossJoin`. It's not clear to me what your desired output is. Can you please try to provide a [mcve] with a small (like 5-10 rows) sample dataset and the desired output? – pault Apr 19 '18 at 18:42
  • @pault Thank you, what I am looking, for each point(coordinate) have a distance minimal from the nearest station. The coordinates dataframe's size 5555 rows and Station dataframe's size 272, then what I need is the minimal distance. Please check the drawing – adil blanco Apr 19 '18 at 18:53
  • @pault I think it works using LIMIT(1). I have to verify the results. – adil blanco Apr 19 '18 at 19:18
  • @adilblanco `LIMIT 1` and `fetch first 1 rows` may return without error, but almost surely the result will be incorrect. – pault Apr 19 '18 at 19:21
  • @pault Suppose if I want to do it with window () as my last two lines of codes, how can I do it? is that correct what I wrote? Thank you for your help – adil blanco Apr 19 '18 at 19:22
  • @pault LIMIT (1) doesn't work. LIMIT(1) picks up a station at random and not the nearest station :( – adil blanco Apr 19 '18 at 19:55
  • @adilblanco I told you this would happen and I will reiterate that the same will be true for `fetch first` as well. Please provide a [mcve] with some sample data and the desired output. See [this link](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples) for more details. If you added `.distinct()` at the end of of your `select` it might work, but that may not be the most efficient solution. – pault Apr 19 '18 at 20:18
  • @pault Please check the question I add data for one user and 4 metro station. For your information I m using a very large dataset. – adil blanco Apr 19 '18 at 22:02

1 Answers1

2

Create metro station dataframe

metro = [{'id': 1, 'xlong': -73.668172, 'xlat': 45.5552769999931, 'station_id': 1, 'type_train': 'metro'},
         {'id': 2, 'xlong': -73.668486, 'xlat': 45.5542469999931, 'station_id': 2, 'type_train': 'metro'},
         {'id': 3, 'xlong': -73.668225, 'xlat': 45.5556069999931, 'station_id': 3, 'type_train': 'metro'},
         {'id': 4, 'xlong': -73.667407, 'xlat': 45.5561219999931, 'station_id': 4, 'type_train': 'metro'}]
metroDF = spark.createDataFrame(metro)

Create coordnates dataframe

coord = [{'uuid': 1009, 'latitude': 45.53175, 'longitude': -73.62613},
         {'uuid': 1009, 'latitude': 45.53163, 'longitude': -73.62546}]
coordDF = spark.createDataFrame(coord)

Create tables

coordinates = coordDF.registerTempTable('coordinates')
metro_table = metroDF.registerTempTable('metro_table')

Join dataframes using CROSS JOIN

query = "SELECT uuid,\
        latitude,\
        longitude,\
        p.station_id,\
        p.xlat,\
        p.xlong,\
        p.type_train,\
          6371000* DEGREES(ACOS(COS(RADIANS(p.xlat))\
                 * COS(RADIANS(latitude))\
                 * COS(RADIANS(p.xlong) - RADIANS(longitude))\
                 + SIN(RADIANS(p.xlat))\
                 * SIN(RADIANS(latitude)))) AS distance_in_meters\
    FROM coordinates\
    CROSS JOIN (\
    SELECT id AS id_transport,\
            station_id,\
            xlat,\
            xlong,\
            type_train\
     FROM metro_table\
    ) AS p ON 1=1\
 ORDER BY distance_in_meters"

Execute the query

 df = sqlContext.sql(query) 

Finally the selection of the nearest station for each point

 from pyspark.sql.window import Window
 from pyspark.sql.functions import *

 w = Window.partitionBy(['uuid', 'latitude', 'longitude']).orderBy('distance_in_meters')
 dfTop = df.withColumn("rn", row_number().over(w)).where(col('rn') == 1).drop("rn")

 dfTop.show()

+----+--------+---------+----------+----------------+----------+----------+------------------+
|uuid|latitude|longitude|station_id|            xlat|     xlong|type_train|distance_in_meters|
+----+--------+---------+----------+----------------+----------+----------+------------------+
|1009|45.53163|-73.62546|         2|45.5542469999931|-73.668486|     metro|240044.33000560844|
|1009|45.53175|-73.62613|         2|45.5542469999931|-73.668486|     metro|237197.13838255248|
+----+--------+---------+----------+----------------+----------+----------+------------------+
adil blanco
  • 335
  • 1
  • 4
  • 14
  • the distance function does not worked for me. I used this insead: acos(sin(pi()*user.lat/180.0)*sin(pi()*p.xlat/180.0)+cos(pi()*user.lat/180.0)*cos(pi()*p.xlat/180.0)*cos(pi()*p.xlong/180.0-pi()*user.lon/180.0))*6371000 AS distance_in_meters – SimonVonDerGoltz Jul 31 '18 at 10:26
  • The only (functional) difference between the distance functions is that OP converts back into Degrees before multiplying by 6371000 (which is incorrect). The RADIAN function works fine, there's no need to explicitly multiply by PI() and divide by 180.0 each time. – nick_j_white May 12 '23 at 10:21