1

df1 :

+--------------------+--------------+----------------------+
|event_id            |action_id     |cancellation_action_id|
+--------------------+--------------+----------------------+
|a                   |actionIdUnique|null                  |
|c                   |ActionId002   |null                  |
+--------------------+--------------+----------------------+

df2 :

+--------------------+--------------+----------------------+
|            event_id|     action_id|cancellation_action_id|
+--------------------+--------------+----------------------+
|a                   |actionIdUnique|                  null|
|b                   |   ActionId004|           ActionId002|
|c                   |   ActionId002|                  null|
+--------------------+--------------+----------------------+

df1 is basicly df2 where cancellation_action_id is null.
my goal is to keep row from df1 where value in action_id is not present in cancellation_action_id

desired output :

+--------------------+--------------+----------------------+
|event_id            |action_id     |cancellation_action_id|
+--------------------+--------------+----------------------+
|a                   |actionIdUnique|null                  |   
+--------------------+--------------+----------------------+

even_id c is remove because his action_id (ActionId002) is equal to cancellation_action_id from event_id b. If feel that there are 2 way to do this : using join or window function.
I try to use left anti join but i dont understand why my jointed dataframe is not equal to the expected one.

df3 = df1.join(df2, df1("action_id) === df2("cancellation_action_id") , "leftanti")

my result :

+--------------------+--------------+----------------------+
|event_id            |action_id     |cancellation_action_id|
+--------------------+--------------+----------------------+
|a                   |actionIdUnique|null                  |
|c                   |ActionId002   |null                  |
+--------------------+--------------+----------------------+

I dont understand why the last row is not removed.
both DataFrame come from same DataFrame so they have the same schema.

maxime G
  • 1,660
  • 1
  • 10
  • 27

2 Answers2

1

This is known issue in spark. We avoid it by doing following: df = df.toDF(*df.columns). You need to do that for every frame that will be joined.

Below is an example in Python but I think this can also be done with scala:

In [90]: df = df.toDF(*df.columns)
In [91]: df.show()
+--------+--------------+----------------------+
|event_id|     action_id|cancellation_action_id|
+--------+--------------+----------------------+
|       a|ActionIdUnique|                  null|
|       b|   ActionId004|           ActionId002|
|       c|   ActionId002|                  null|
+--------+--------------+----------------------+

In [92]: df1 = df.filter(F.col('cancellation_action_id').isNull())
In [93]: df1 = df1.toDF(*df1.columns)
In [94]: df1.show()
+--------+--------------+----------------------+
|event_id|     action_id|cancellation_action_id|
+--------+--------------+----------------------+
|       a|ActionIdUnique|                  null|
|       c|   ActionId002|                  null|
+--------+--------------+----------------------+

In [95]: df_res = df1.join(df, df1['action_id'] == df['cancellation_action_id'], 'leftanti')
In [96]: df_res.show()
+--------+--------------+----------------------+
|event_id|     action_id|cancellation_action_id|
+--------+--------------+----------------------+
|       a|ActionIdUnique|                  null|
+--------+--------------+----------------------+
busfighter
  • 321
  • 1
  • 6
0

This happens due to an incorrect comparison of columns with Nulls

df3 = df1.alias('df1').join(
    df2.alias('df2'),
    F.expr('df1.action_id == df2.cancellation_action_id') ,
    "leftanti"
)
df3.show()
+--------------+--------+----------------------+
|     action_id|event_id|cancellation_action_id|
+--------------+--------+----------------------+
|actionIdUnique|       a|                  null|
+--------------+--------+----------------------+

More details: PySpark: Handing NULL in Joins

Comparing with Nulls in Spark not obviously:

df2.filter(F.col('cancellation_action_id') == 'ActionId002').show()

+-----------+--------+----------------------+
|  action_id|event_id|cancellation_action_id|
+-----------+--------+----------------------+
|ActionId004|       b|           ActionId002|
+-----------+--------+----------------------+

But

df2.filter(~(F.col('cancellation_action_id') == 'ActionId002')).show()

+---------+--------+----------------------+
|action_id|event_id|cancellation_action_id|
+---------+--------+----------------------+
+---------+--------+----------------------+

Correct:

df2.filter(~F.col('cancellation_action_id').eqNullSafe('ActionId002')).show()

+--------------+--------+----------------------+
|     action_id|event_id|cancellation_action_id|
+--------------+--------+----------------------+
|actionIdUnique|       a|                  null|
|   ActionId002|       c|                  null|
+--------------+--------+----------------------+