-3

I have two data frames DF1 and DF2 through pyspar. I want output like below:

DF1

Id|field_A   |field_B   |field_C |field_D
1 |cat       |12        |black   |1
2 |dog       |128       |white   |2

DF2

Id|field_A|field_B|field_C
1 |cat    |13     |blue

Output required:

DF3

Id|field_A|field_B|field_C|field_D
1 |cat    |13     |blue   |1
2 |dog    |128    |white  |2

I have tried through the join concept, but it's not working through the below joins.

'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 'left_outer', 'rightouter', 'right', 'right_outer', 'leftsemi', 'left_semi', 'leftanti', 'left_anti', 'cross'

DF3 = DF2.join(DF1, DF1.ID == DF2.ID,"leftouter")
pault
  • 41,343
  • 15
  • 107
  • 149
  • could you please format the code properly so I can try to help – thePurplePython May 09 '19 at 17:49
  • 3
    Possible duplicate of [update a dataframe column with new values](https://stackoverflow.com/questions/49442572/update-a-dataframe-column-with-new-values) – pault May 09 '19 at 17:59

1 Answers1

0

First , you must to declare alias to Dataframes:

val a = df1.as("a")
val b = df2.as("b")

Create an array with the columns that not updated

val columnsNotUpdated =
        Seq(col("a.Id").as("Id"), col("a.field_D").as("field_D"))

Create an array with the columns that updated , and use when to see if it crosses (b.Id is not null) with dataframe b and if it crosses, select the value of dataframe 'b'

 val columnsUpdated = a.columns
        .filter(x => !Array("Id", "field_D").exists(_ == x))
        .map(x =>
          when(col("b.Id").isNotNull, col(f"b.$x").as(x))
            .otherwise(col(f"a.$x").as(x)))

Finally , join with 'left_outer' and select the columns

a.join(b, col("a.Id") === col("b.Id"), "left_outer")
        .select(columnsNotUpdated.union(columnsUpdated): _*)

All code is :

val a = df1.as("a")
      val b = df2.as("b")

      val columnsNotUpdated =
        Seq(col("a.Id").as("Id"), col("a.field_D").as("field_D"))

      val columnsUpdated = a.columns
        .filter(x => !Array("Id", "field_D").exists(_ == x))
        .map(x =>
          when(col("b.Id").isNotNull, col(f"b.$x").as(x))
            .otherwise(col(f"a.$x").as(x)))

      a.join(b, col("a.Id") === col("b.Id"), "left_outer")
        .select(columnsNotUpdated.union(columnsUpdated): _*)