0

I have two dataframes as follows. I want to add a new column to dataframe df_a from dataframe df_b column val_1 based on the condition df_a.col_p == df_b.id

df_a = sqlContext.createDataFrame([(1412, 31, 1), (2422, 21, 1), (4223, 22, 2), (
    2244, 43, 1), (1235, 54, 1), (4126, 12, 5), (2342, 44, 1 )], ["idx", "col_n", "col_p"])
df_a.show()

+----+-----+-----+
| idx|col_n|col_p|
+----+-----+-----+
|1412|   31|    1|
|2422|   21|    1|
|4223|   22|    2|
|2244|   43|    1|
|1235|   54|    1|
|4126|   12|    5|
|2342|   44|    1|
+----+-----+-----+
df_b = sqlContext.createDataFrame([(1, 1, 1), (2, 1, 1), (3, 1, 2), (
    4, 1, 1), (5, 2, 1), (6, 2, 2)], ["id", "val_1", "val_2"])
df_b.show()

+---+-----+-----+
| id|val_1|val_2|
+---+-----+-----+
|  1|    1|    1|
|  2|    1|    1|
|  3|    1|    2|
|  4|    1|    1|
|  5|    2|    1|
|  6|    2|    2|
+---+-----+-----+

Expected output

+----+-----+-----+-----+
| idx|col_n|col_p|val_1|
+----+-----+-----+-----+
|1412|   31|    1|    1|
|2422|   21|    1|    1|
|4223|   22|    2|    1|
|2244|   43|    1|    1|
|1235|   54|    1|    1|
|4126|   12|    5|    2|
|2342|   44|    1|    1|
+----+-----+-----+-----+

My code

cond = (df_a.col_p == df_b.id) 
df_a_new = df_a.join(df_b, cond, how ='full').withColumn('val_new', F.when(cond, df_b.val_1))
df_a_new = df_a_new.drop(*['id', 'val_1', 'val_2'])
df_a_new = df_a_new.filter(df_a_new.idx. isNotNull())
df_a_new.show()

How can I get the proper output as expected result with correct index order?

mck
  • 40,932
  • 13
  • 35
  • 50
xcen
  • 652
  • 2
  • 6
  • 22

2 Answers2

3

You can assign an increasing index to df_a and sort by that index after joining. Also I'd suggest doing a left join rather than a full join.

import pyspark.sql.functions as F

df_a_new = df_a.withColumn('index', F.monotonically_increasing_id()) \
               .join(df_b, df_a.col_p == df_b.id, 'left') \
               .orderBy('index') \
               .select('idx', 'col_n', 'col_p', 'val_1')

df_a_new.show()
+----+-----+-----+-----+
| idx|col_n|col_p|val_1|
+----+-----+-----+-----+
|1412|   31|    1|    1|
|2422|   21|    1|    1|
|4223|   22|    2|    1|
|2244|   43|    1|    1|
|1235|   54|    1|    1|
|4126|   12|    5|    2|
|2342|   44|    1|    1|
+----+-----+-----+-----+
mck
  • 40,932
  • 13
  • 35
  • 50
1

you need to create your own indexes (monotomically_increasing_ids) and sort again after joining on those indexes. But there is no way you can join while preserving order in spark as the rows are partitioned before joining and they lose order before combining refer: Can Dataframe joins in Spark preserve order?

Itachi
  • 2,817
  • 27
  • 35
  • It is possible to preserve order. See the answer to your referenced question, which uses `monotonically_increasing_id`, as I did in my answer. – mck Jan 04 '21 at 06:44
  • Yeah, I meant there is no direct way in which you can avoid shuffling, you need to sort it after joining – Itachi Jan 04 '21 at 13:03