7

df1 has fields id and json; df2 has fields idand json

df1.count() => 1200; df2.count() => 20

df1 has all the rows. df2 has an incremental update with just 20 rows.

My goal is to update df1 with the values from df2. All the ids of df2 are in df1. But df2 has updated values(in the json field) for those same ids.

Resulting df should have all the values from df1 and updated values from df2.

What is the best way to do this? - With the least number of joins and filters.

Thanks!

koiralo
  • 22,594
  • 6
  • 51
  • 72
suprita shankar
  • 1,554
  • 2
  • 16
  • 47

2 Answers2

8

You can achieve this using one left join.

Create Example DataFrames

Using the sample data provided by @Shankar Koirala in his answer.

data1 = [
  (1, "a"),
  (2, "b"),
  (3, "c")
]
df1 = sqlCtx.createDataFrame(data1, ["id", "value"])

data2 = [
  (1, "x"), 
  (2, "y")
]

df2 = sqlCtx.createDataFrame(data2, ["id", "value"])

Do a left join

Join the two DataFrames using a left join on the id column. This will keep all of the rows in the left DataFrame. For the rows in the right DataFrame that don't have a matching id, the value will be null.

import pyspark.sql.functions as f
df1.alias('l').join(df2.alias('r'), on='id', how='left')\
    .select(
        'id',
         f.col('l.value').alias('left_value'),
         f.col('r.value').alias('right_value')
    )\
    .show()
#+---+----------+-----------+
#| id|left_value|right_value|
#+---+----------+-----------+
#|  1|         a|          x|
#|  3|         c|       null|
#|  2|         b|          y|
#+---+----------+-----------+

Select the desired data

We will use the fact that the unmatched ids have a null to select the final columns. Use pyspark.sql.functions.when() to use the right value if it is not null, otherwise keep the left value.

df1.alias('l').join(df2.alias('r'), on='id', how='left')\
    .select(
        'id',
        f.when(
            ~f.isnull(f.col('r.value')),
            f.col('r.value')
        ).otherwise(f.col('l.value')).alias('value')
    )\
    .show()
#+---+-----+
#| id|value|
#+---+-----+
#|  1|    x|
#|  3|    c|
#|  2|    y|
#+---+-----+

You can sort this output if you want the ids in order.


Using pyspark-sql

You can do the same thing using a pyspark-sql query:

df1.registerTempTable('df1')
df2.registerTempTable('df2')

query = """SELECT l.id, 
CASE WHEN r.value IS NOT NULL THEN r.value ELSE l.value END AS value 
FROM df1 l LEFT JOIN df2 r ON l.id = r.id"""
sqlCtx.sql(query.replace("\n", "")).show()
#+---+-----+
#| id|value|
#+---+-----+
#|  1|    x|
#|  3|    c|
#|  2|    y|
#+---+-----+
pault
  • 41,343
  • 15
  • 107
  • 149
0

I would like to provide a slightly more general solution. What happens if the input data has 100 columns instead of 2? We would spend too much time making a coalesce of those 100 columns to keep the values on the right side of the left join. Another way to solve this problem would be to "delete" the updated rows from the original df and finally make a union with the updated rows.

data_orginal = spark.createDataFrame([
    (1, "a"),
    (2, "b"),
    (3, "c")
], ("id", "value"))

data_updated = spark.createDataFrame([
    (1, "x"),
    (2, "y")
], ("id", "value"))

data_orginal.show()
+---+-----+
| id|value|
+---+-----+
|  1|    a|
|  2|    b|
|  3|    c|
+---+-----+

data_updated.show()
+---+-----+
| id|value|
+---+-----+
|  1|    x|
|  2|    y|
+---+-----+

data_orginal.createOrReplaceTempView("data_orginal")
data_updated.createOrReplaceTempView("data_updated")

src_data_except_updated = spark.sql(f"SELECT * FROM data_orginal WHERE id not in (1,2)")
result_data = src_data_except_updated.union(data_updated)

result_data.show()
+---+-----+
| id|value|
+---+-----+
|  3|    c|
|  1|    x|
|  2|    y|
+---+-----+

Notice that the query

SELECT * FROM data_orginal WHERE id not in (1,2)

could be generated automatically:

ids_collect = spark.sql(f"SELECT id FROM data_updated").collect()
ids_list = [f"{x.id}" for x in ids_collect]
ids_str = ",".join(ids_list)
query_get_all_except = f"SELECT * FROM data_original WHERE id not in ({ids_str})"
alvaro nortes
  • 570
  • 4
  • 10