8

I am trying to join 2 dataframes in pyspark. My problem is I want my "Inner Join" to give it a pass, irrespective of NULLs. I can see that in scala, I have an alternate of <=>. But, <=> is not working in pyspark.

userLeft = sc.parallelize([
Row(id=u'1', 
    first_name=u'Steve', 
    last_name=u'Kent', 
    email=u's.kent@email.com'),
Row(id=u'2', 
    first_name=u'Margaret', 
    last_name=u'Peace', 
    email=u'marge.peace@email.com'),
Row(id=u'3', 
    first_name=None, 
    last_name=u'hh', 
    email=u'marge.hh@email.com')]).toDF()

userRight = sc.parallelize([
Row(id=u'2', 
    first_name=u'Margaret', 
    last_name=u'Peace', 
    email=u'marge.peace@email.com'),
Row(id=u'3', 
    first_name=None, 
    last_name=u'hh', 
    email=u'marge.hh@email.com')]).toDF()

Current working version:

userLeft.join(userRight, (userLeft.last_name==userRight.last_name) & (userLeft.first_name==userRight.first_name)).show()

Current Result:

    +--------------------+----------+---+---------+--------------------+----------+---+---------+
|               email|first_name| id|last_name|               email|first_name| id|last_name|
    +--------------------+----------+---+---------+--------------------+----------+---+---------+ 
    |marge.peace@email...|  Margaret|  2|    Peace|marge.peace@email...|  Margaret|  2|    Peace|
    +--------------------+----------+---+---------+--------------------+----------+---+---------+

Expected Result:

    +--------------------+----------+---+---------+--------------------+----------+---+---------+
|               email|first_name| id|last_name|               email|first_name| id|last_name|
+--------------------+----------+---+---------+--------------------+----------+---+---------+
|  marge.hh@email.com|      null|  3|       hh|  marge.hh@email.com|      null|  3|       hh|
|marge.peace@email...|  Margaret|  2|    Peace|marge.peace@email...|  Margaret|  2|    Peace|
+--------------------+----------+---+---------+--------------------+----------+---+---------+
orNehPraka
  • 413
  • 2
  • 6
  • 14

2 Answers2

10

For PYSPARK < 2.3.0 you can still build the <=> operator with an expression column like this:

import pyspark.sql.functions as F
df1.alias("df1").join(df2.alias("df2"), on = F.expr('df1.column <=> df2.column'))

For PYSPARK >= 2.3.0, you can use Column.eqNullSafe or IS NOT DISTINCT FROM as answered here.

Marcos Pindado
  • 181
  • 1
  • 9
5

Use another value instead of null:

userLeft = userLeft.na.fill("unknown")
userRight = userRight.na.fill("unknown")

userLeft.join(userRight, ["last_name", "first_name"])

    +---------+----------+--------------------+---+--------------------+---+
    |last_name|first_name|               email| id|               email| id|
    +---------+----------+--------------------+---+--------------------+---+
    |    Peace|  Margaret|marge.peace@email...|  2|marge.peace@email...|  2|
    |       hh|   unknown|  marge.hh@email.com|  3|  marge.hh@email.com|  3|
    +---------+----------+--------------------+---+--------------------+---+
MaFF
  • 9,551
  • 2
  • 32
  • 41
  • 1
    I tried this approach. For string and date columns, I was able to convert it to distinguish Null values. like: String "NULLCUSTOM" and for date: "8888-01-01". But I couldn't thing of a value for integer or float values. Do you have any idea? – orNehPraka Sep 05 '17 at 20:05
  • `float("inf")` it will be cast as `long` if column is of type `int` or `long` it's actually not infinity it's `9223372036854775807` – MaFF Sep 05 '17 at 20:17
  • or `-1` for id columns – MaFF Sep 05 '17 at 20:47