4

I have two spark dataframes:

Dataframe A:

|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |

and dataframe B:

|col_1 | col_2 | ... | col_m |
|val_1 | val_2 | ... | val_m |

Dataframe B can contain duplicate, updated and new rows from dataframe A. I want to write an operation in spark where I can create a new dataframe containing the rows from dataframe A and the updated and new rows from dataframe B.

I started by creating a hash column containing only the columns that are not updatable. This is the unique id. So let's say col1 and col2 can change value (can be updated), but col3,..,coln are unique. I have created a hash function as hash(col3,..,coln):

A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))

Now I want to write some spark code that basically selects the rows from B that have the hash not in A (so new rows and updated rows) and join them into a new dataframe together with the rows from A. How can I achieve this in pyspark?

Edit: Dataframe B can have extra columns from dataframe A, so a union is not possible.

Sample example

Dataframe A:

+-----+-----+
|col_1|col_2|
+-----+-----+
|    a|  www|
|    b|  eee|
|    c|  rrr|
+-----+-----+

Dataframe B:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    d|  yyy|    2|
|    c|  rer|    3|
+-----+-----+-----+

Result: Dataframe C:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    b|  eee| null|
|    c|  rer|    3|
|    d|  yyy|    2|
+-----+-----+-----+
pault
  • 41,343
  • 15
  • 107
  • 149
djWann
  • 2,017
  • 4
  • 31
  • 36
  • Are you looking for something like [this answer](https://stackoverflow.com/a/49452175/5858851)? Instead of hashing, the better approach would be a join on the unique id. – pault May 11 '18 at 15:44
  • Possible duplicate of [update a dataframe column with new values](https://stackoverflow.com/questions/49442572/update-a-dataframe-column-with-new-values) – pault May 11 '18 at 15:47
  • It's not similar with that answer because for me I need to keep also new rows coming from dataframe B. – djWann May 11 '18 at 15:53
  • I need a hash column as I don't have an unique id column. – djWann May 11 '18 at 15:53
  • You can join on multiple columns which should be equivalent to hashing, but it's hard to tell based on your question. Can you provide a [reproducible example](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples) with some small sample input/desired output? – pault May 11 '18 at 15:55
  • @pault Unfortunately I cannot union because dataframe B can contain new columns as well than can be set to null in A. Is there another operation I can use there? – djWann May 11 '18 at 16:07
  • it's very difficult to answer your question without an example. Please see the post I linked on how to [create good reproducible apache spark dataframe examples](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples). I am fairly certain that I have a solution for your problem, but your requirements are unclear so some sample input/output is important to clear things up. – pault May 11 '18 at 16:15
  • @pault I've updated the question – djWann May 11 '18 at 16:23

3 Answers3

4

This is closely related to update a dataframe column with new values, except that you also want to add the rows from DataFrame B. One approach would be to first do what is outlined in the linked question and then union the result with DataFrame B and drop duplicates.

For example:

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        'col_1',
        f.when(
            ~f.isnull(f.col('b.col_2')),
            f.col('b.col_2')
        ).otherwise(f.col('a.col_2')).alias('col_2'),
        'b.col_3'
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
#+-----+-----+-----+
#|col_1|col_2|col_3|
#+-----+-----+-----+
#|    a|  wew|    1|
#|    b|  eee| null|
#|    c|  rer|    3|
#|    d|  yyy|    2|
#+-----+-----+-----+

Or more generically using a list comprehension if you have a lot of columns to replace and you don't want to hard code them all:

cols_to_update = ['col_2']

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        *[
            ['col_1'] + 
            [
                f.when(
                    ~f.isnull(f.col('b.{}'.format(c))),
                    f.col('b.{}'.format(c))
                ).otherwise(f.col('a.{}'.format(c))).alias(c)
                for c in cols_to_update
            ] + 
            ['b.col_3']
        ]
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
pault
  • 41,343
  • 15
  • 107
  • 149
2

I would opt for different solution, which I believe is less verbose, more generic and does not involve column listing. I would first identify subset of dfA that will be updated (replaceDf) by performing inner join based on keyCols (list). Then I would subtract this replaceDF from dfA and union it with dfB.

    replaceDf = dfA.alias('a').join(dfB.alias('b'), on=keyCols, how='inner').select('a.*')
    resultDf = dfA.subtract(replaceDf).union(dfB).show()

Even though there will be different columns in dfA and dfB, you can still overcome this with obtaining list of columns from both DataFrames and finding their union. Then I would prepare select query (instead of "select.('a.')*") so that I would just list columns from dfA that exist in dfB + "null as colname" for those that do not exist in dfB.

bazinac
  • 668
  • 5
  • 22
0

If you want to keep only unique values, and require strictly correct results, then union followed by dropDupilcates should do the trick:

columns_which_dont_change = [...]
old_df.union(new_df).dropDuplicates(subset=columns_which_dont_change)