0

Let's say I have two DataFrames:

headers = ["id", "info"]

a = sc.parallelize([(1, "info1"), (2, "info2"), (3, "info3")]).toDF(headers)
b = sc.parallelize([(2, "info2new")]).toDF(headers)

And I want to obtain the files from a but overwrite those rows in a that appear in b. So the desired output is

+---+--------+
| id|    info|
+---+--------+
|  1|   info1|
|  2|info2new|
|  3|   info3|
+---+--------+

Now, I could convert my DataFrames to RDDs, and use subtractByKey, but this forces me to convert to RDDs, map to (key, value) tuples and then convert back to DataFrames. Meh.

I looked around and saw the functions except and subtract for Spark DataFrames, but these require the rows to be exact duplicates, which mine are not.

The way I am considering doing this now is like so:

a1 = a.select(*(col(x).alias(x + '_old') for x in a.columns))
b1 = b.select(*(col(x).alias(x + '_new') for x in b.columns))
x = a1.join(b1, a1['id_old'] == b1['id_new"], "outer")

Then I would register x as a tempTable and write a SQL query that would return the old data unless the new data is not null. But I don't think this is particularly clean either!

Any good ideas?

Community
  • 1
  • 1
Katya Willard
  • 2,152
  • 4
  • 22
  • 43

1 Answers1

0

Not sure the best way but you could do left outer join with a UDF and some cleaning stuff.

Scala:

// UDF to choose info value
val newInfo = udf[String,String,String]((infoA,infoB) => {
  if (infoB == null)
    infoA
  else
    infoB
})

// join -> add col("newInfo") -> drop info cols -> rename "newInfo"
a.join(b, Seq("id"), "left_outer")
  .withColumn("newInfo", newInfo(a("info"), b("info")))
  .drop(a("info"))
  .drop(b("info"))
  .withColumnRenamed("newInfo", "info")
  .show()
Yuan JI
  • 2,927
  • 2
  • 20
  • 29