0

I have a pyspark dataframe A, that has 3 columns:

Lat lon zip
-69 40  trp
-69 41  nxt

I have another pyspark dataframe B, that has same columns, but values captured will be different:

Lat lon zip
-68 43  trp
-89 45  trp

I want to create latitude longitude pairs for each record of A, based on matching zip in dataframe B.

So the output RDD pairs would be like:

(([-69,40],[-68,43]),
 ([-69,40],[-89,45]))

Can I do this without looping? I was trying to replicate .map used here:Spark cartesian product

Unsuccessfully

muni
  • 1,263
  • 4
  • 22
  • 31

1 Answers1

1

A cartesian product is a join without any join key, it projects all the lines of a dataframe on all the lines of another dataframe. Say you have 2 dataframes A and B with nA and nB rows respectively, then you'll end up with a dataframe with nA x nB rows.

What you are looking for is a regular inner join with join key zip:

A = spark.createDataFrame([[-69,40,"trp"],[-69,41,"nxt"]], ["Lat","lon","zip"])
B = spark.createDataFrame([[-68,43,"trp"],[-89,45,"trp"]], ["Lat","lon","zip"])
A.join(B, "zip").show()

    +---+---+---+---+---+
    |zip|Lat|lon|Lat|lon|
    +---+---+---+---+---+
    |trp|-69| 40|-68| 43|
    |trp|-69| 40|-89| 45|
    +---+---+---+---+---+

Note: be carefull with column name disambiguation, you can for instance put lon, Lat in a structure for each dataframe before joining them:

import pyspark.sql.functions as psf
A = A.select("zip", psf.struct("Lat", "Long").alias("A"))
B = B.select("zip", psf.struct("Lat", "Long").alias("B"))

df = A.join(B, "zip")
df.show()
df.printSchema()

    +---+--------+--------+
    |zip|       A|       B|
    +---+--------+--------+
    |trp|[-69,40]|[-68,43]|
    |trp|[-69,40]|[-89,45]|
    +---+--------+--------+

    root
     |-- zip: string (nullable = true)
     |-- A: struct (nullable = false)
     |    |-- Lat: long (nullable = true)
     |    |-- lon: long (nullable = true)
     |-- B: struct (nullable = false)
     |    |-- Lat: long (nullable = true)
     |    |-- lon: long (nullable = true)
MaFF
  • 9,551
  • 2
  • 32
  • 41
  • ok. Is it suitable to create a dataframe, If each zip say have some 1000 points and I am joining with dataframe that has some 1m points. Or there are alternative ways like RDD or arrays tuples, that take less memory – muni Nov 16 '17 at 14:32
  • You can `broadcast` the small dataframe when joining, this will copy it to everynode to optimize memory usage: `A.join(psf.broadcast(B), "zip")` – MaFF Nov 16 '17 at 19:20