0

I have some data with products (DF), however some don't have a description. I have an excel file with the description of some (loaded as Map). Now I would like to fill the missing values in DF with those of Map and the rows that already have a description keep them untouched using Pyspark.

DF
Id | Desc
01 | 'desc1'
02 | null
03 | 'desc3'
04 | null

Map
Key | Value
2   | 'desc2'
4   | 'desc4'

Output
Id | Desc
1  | 'desc1'
2  | 'desc2'
3  | 'desc3'
4  | 'desc4'

Thanks in advance

Daniel Zapata
  • 812
  • 3
  • 9
  • 31
  • Possible duplicate of [How to update a pyspark dataframe with new values from another dataframe?](https://stackoverflow.com/questions/50295783/how-to-update-a-pyspark-dataframe-with-new-values-from-another-dataframe) – pault Aug 22 '19 at 16:28

4 Answers4

1

You'll want to make sure the DF.Id field and the Map.Key field are the same type/values (currently, they don't look like it with the leading 0), then do a left join, and then select the desired columns with a coalesce(). My pySpark is a bit rusty, so I'll provide the solution in scala. The logic should be the same.

val df = Seq(
    (1, "desc1"),
    (2, null),
    (3, "desc3"),
    (4, null)
).toDF("Id", "Desc")

val map = Seq(
    (2, "desc2"),
    (4, "desc4")
).toDF("Key", "Value")

df.show()
map.show()

df.join(map, df("Id") === map("Key"), "left")
  .select(
      df("Id"),
      coalesce(df("Desc"), $"Value").as("Desc")
      )
  .show()

Yields:

+---+-----+
| Id| Desc|
+---+-----+
|  1|desc1|
|  2| null|
|  3|desc3|
|  4| null|
+---+-----+

+---+-----+
|Key|Value|
+---+-----+
|  2|desc2|
|  4|desc4|
+---+-----+

+---+-----+
| Id| Desc|
+---+-----+
|  1|desc1|
|  2|desc2|
|  3|desc3|
|  4|desc4|
+---+-----+
Travis Hegner
  • 2,465
  • 1
  • 12
  • 11
0

In PySpark, with the help of an UDF:

schema = StructType([StructField("Index", IntegerType(), True),
                    StructField("Desc", StringType(), True)])

DF = sc.parallelize([(1, "desc1"), (2,None), (3,"desc3"), (4, None)]).toDF(schema)

myMap = {
      2: "desc2",
      4 : "desc4"
    }

myMapBroadcasted = sc.broadcast(myMap)

@udf(StringType())
def fillNone(Index, Desc):
  if Desc is None:
    if Index in myMapBroadcasted.value:
      return myMapBroadcasted.value[Index]
  return Desc

DF.withColumn('Desc', fillNone(col('Index'), col('Desc'))).show()
LizardKing
  • 601
  • 6
  • 13
0

It's hard to know the cardinality of the datasets that you've provided... some examples of how that might change a solution here are:

  1. If "DF" and "Map" have overlapping Desc... how should we prioritize which table has the "right" description?
  2. Does the final dataframe that you are looking to create need to be fully inclusive of a list of ID's or descriptions? Do either of these dataframes have the full list? This could also change the solution.

I've made some assumptions so that you can determine for yourself what is the right approach here:

  • I'm assuming that "DF" contains the whole list of IDs
  • I'm assuming that "Map" only has a subset of IDs and is not wholly inclusive of the broader set of IDs that exist within "DF"

I'm using PySpark here:

DF = DF.na.drop() # we'll eliminate the missing values from the parent dataframe
DF_Output = DF.join(Map, on = "ID", how = 'outer')
shadow_dev
  • 130
  • 1
  • 1
  • 14
0

We can divide DF into two dataframes, operate on them separately, and then union them:

val df = Seq(
    (1, "desc1"),
    (2, null),
    (3, "desc3"),
    (4, null)
).toDF("Id", "Desc")

val Map = Seq(
    (2, "desc2"),
    (4, "desc4")
).toDF("Key", "Value")

val nullDF = df.where(df("Desc").isNull)
val nonNullDF = df.where(df("Desc").isNotNull)

val joinedWithKeyDF = nullDF.drop("Desc").join(Map, nullDF("Id")===Map("Key")).withColumnRenamed("Value", "Desc").drop("Key")

val outputDF = joinedWithKeyDF.union(nonNullDF)

Amita
  • 944
  • 1
  • 8
  • 20