2

I have pyspark df1

|id |name   |        email    |age|college|
|---| ------+ ----------------+---+-------|
|12 | Sta   |sta@example.com  |25 |clg1   |
|21 |Danny  |dany@example.com |23 |clg2   |
|37 |Elle   |elle@example.com |27 |clg3   |
|40 |Mark   |mark1@example.com|40 |clg4   |
|36 |John   |jhn@example.com  |32 |clg5   |

I have pyspark df2

|id |name   |age  |
+---+-------+ ----+
|36 | Sta   |30   |
|12 | raj   |25   |
|29 | jack  |33   |
|87 | Mark  |67   |
|75 | Alle  |23   |
|89 |Jalley |32   |
|55 |kale   |99   |

Now I want to join the df2 with df1 to get the email and college attached to df2 on the below conditions:

if df1 id equals df2 id or df1 name equals df2 name df1 age equals df2 age if nothing matches fill NULL

In other words if first condition matched then it should not match with other condition, if first condition does not match then it should consider the other conditions to be matched subsequently if none of them match then fill Null.

for example

df2 should become like this

|id|name    |age |email             |college
|--| ------ | ---|------------------|-----
|36| Sta    |30  |jhn@example.com   |clg5
|12| raj    |25  |sta@example.com   |clg1
|29| jack   |33  |NULL              |NULL
|87| Mark   |67  |mark1@example.com |clg4
|75| Alle   |23  |dany@example.com  |clg2
|89| Jalley |32  |jhn@example.com   |clg5
|55| kale   |99  |NULL              |NULL

I have tried a lot with inbuilt join function but did not able to acheive that also tries with creating udf but they are very inefficient.

Also the data is too large cant apply any udf in it and running on spark cluster 3.x

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
  • Post your code. Also [Pandas UDF](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.pandas_udf.html) are much more efficient than pyspark udfs. edit: Something is drastically wrong with your join conditions. Column(s) used in join should be able to uniquely identify single row in both `df1` and `df2`. If not then then you need to define the behavior how to pick single row out of multiple matching rows. E.g. when you join on `age`. – Kashyap May 16 '23 at 00:37
  • @Kashyap In first approach , I am using or condition in join but it is matching all the condition if available and creating the additional rows if id matched, named matched or age matched, but we want to stop matching the name and age match if id matched. – praveen kumar May 16 '23 at 04:39
  • Post your code. Add the expected behavior with some samples for the cases you want to support. Are you assuming you have only 100 rows in df1 where id or name don't match? Otherwise you HAVE to define the behavior (how to pick single row out of multiple matching rows). Add some more realistic sample data in df1 with same age/first-name and provide expected results. – Kashyap May 16 '23 at 15:04

2 Answers2

1

I am not sure if there is any convenient way to perform a conditional join (like join on id if there is a match, then try to join on name, then age). I think you will need to perform three different joins: left join df1 to df2 on id, then the same join on name, then age, and union these three dataframes together (as in this answer). To avoid ambiguity and duplicate column names, we will select the id, name, and age from df2.

For example:

df2_id_match = df2.join(
    df1,df2.id==df1.id,how='left'
).select(
    df2.id,
    df2.name,
    df2.age,
    df1.email,
    df1.college
)

df2_name_match = df2.join(
    df1,df2.name==df1.name,how='left'
).select(
    df2.id,
    df2.name,
    df2.age,
    df1.email,
    df1.college
)

df2_age_match = df2.join(
    df1,df2.age==df1.age,how='left'
).select(
    df2.id,
    df2.name,
    df2.age,
    df1.email,
    df1.college
)

df2_df1_joined = df2_id_match.union(
    df2_name_match
).union(
    df2_age_match
)

+---+------+---+-----------------+-------+
| id|  name|age|            email|college|
+---+------+---+-----------------+-------+
| 36|   Sta| 30|  jhn@example.com|   clg5|
| 12|   raj| 25|  sta@example.com|   clg1|
| 29|  jack| 33|             null|   null|
| 87|  Mark| 67|             null|   null|
| 75|  Alle| 23|             null|   null|
| 89|Jalley| 32|             null|   null|
| 55|  kale| 99|             null|   null|
| 36|   Sta| 30|  sta@example.com|   clg1|
| 12|   raj| 25|             null|   null|
| 29|  jack| 33|             null|   null|
| 87|  Mark| 67|mark1@example.com|   clg4|
| 75|  Alle| 23|             null|   null|
| 89|Jalley| 32|             null|   null|
| 55|  kale| 99|             null|   null|
| 36|   Sta| 30|             null|   null|
| 12|   raj| 25|  sta@example.com|   clg1|
| 29|  jack| 33|             null|   null|
| 87|  Mark| 67|             null|   null|
| 75|  Alle| 23| dany@example.com|   clg2|
| 89|Jalley| 32|  jhn@example.com|   clg5|
+---+------+---+-----------------+-------+
only showing top 20 rows

Then we want to keep the rows with the most information, so we can count the number of nulls in each row (as shown here), then perform a window operation where we partition on ['id','name','age'] and only keep the rows with the fewest nulls in each partition (which uses the method from this answer). Then drop all duplicates subset by ['id','name','age'].

from pyspark.sql import Window

w = Window.partitionBy('id','name','age')

df2_df1_joined.withColumn(
    'null_count', 
    sum([
        F.isnull(df2_df1_joined[col]).cast(IntegerType()) 
        for col in df2_df1_joined.columns]
    )
).withColumn(
    'min_null_count', 
    F.min('null_count').over(w)
).where(
    F.col('null_count') == F.col('min_null_count')
).drop(
    'min_null_count','null_count'
).dropDuplicates(
    ['id','name','age']
)

+---+------+---+-----------------+-------+
| id|  name|age|            email|college|
+---+------+---+-----------------+-------+
| 12|   raj| 25|  sta@example.com|   clg1|
| 29|  jack| 33|             null|   null|
| 36|   Sta| 30|  jhn@example.com|   clg5|
| 55|  kale| 99|             null|   null|
| 75|  Alle| 23| dany@example.com|   clg2|
| 87|  Mark| 67|mark1@example.com|   clg4|
| 89|Jalley| 32|  jhn@example.com|   clg5|
+---+------+---+-----------------+-------+
Derek O
  • 16,770
  • 4
  • 24
  • 43
0

Annotated Code

cols = ['email', 'college']
keys = ['id', 'name', 'age']

for k in keys:
    # drop dupes on key if there's any
    temp = df1.drop_duplicates([k])
    
    # Select relevant columns and use an alias to rename so that
    # these new columns can be easily referenced in df2 after join
    temp = temp.select([k, *[F.col(c).alias(f'{k}_{c}') for c in cols]])
    
    # left join with df2 on key
    df2 = df2.join(temp, on=k, how='left')


# For each column, coalesce the values from the duplicate columns
for c in cols:
    to_drop = [f'{k}_{c}' for k in keys]
    df2 = df2.withColumn(c, F.coalesce(*to_drop).alias(c))
    df2 = df2.drop(*to_drop)

df2.show()

+---+------+---+-----------------+-------+
|age|  name| id|            email|college|
+---+------+---+-----------------+-------+
| 32|Jalley| 89|  jhn@example.com|   clg5|
| 25|   raj| 12|  sta@example.com|   clg1|
| 33|  jack| 29|             null|   null|
| 67|  Mark| 87|mark1@example.com|   clg4|
| 30|   Sta| 36|  jhn@example.com|   clg5|
| 23|  Alle| 75| dany@example.com|   clg2|
| 99|  kale| 55|             null|   null|
+---+------+---+-----------------+-------+
Shubham Sharma
  • 68,127
  • 6
  • 24
  • 53