1

I have two Spark data-frames loaded from csv of the form :

mapping_fields (the df with mapped names):

new_name old_name
A        aa
B        bb
C        cc

and

aa bb cc dd
1  2  3  43
12 21 4  37

to be transformed into :

A  B  C D
1  2  3 
12 21 4

as dd didn't have any mapping in the original table, D column should have all null values.

How can I do this without converting the mapping_df into a dictionary and checking individually for mapped names? (this would mean I have to collect the mapping_fields and check, which kind of contradicts my use-case of distributedly handling all the datasets)

Thanks!

zero323
  • 322,348
  • 103
  • 959
  • 935
void
  • 2,403
  • 6
  • 28
  • 53

2 Answers2

1

With melt borrowed from here you could:

from pyspark.sql import functions as f

mapping_fields = spark.createDataFrame(
    [("A", "aa"), ("B", "bb"), ("C", "cc")],
    ("new_name", "old_name"))
df = spark.createDataFrame(
    [(1, 2, 3, 43), (12, 21, 4, 37)],
    ("aa", "bb", "cc", "dd"))

(melt(df.withColumn("id", f.monotonically_increasing_id()),
       id_vars=["id"],  value_vars=df.columns, var_name="old_name")
    .join(mapping_fields, ["old_name"], "left_outer")
    .withColumn("value", f.when(f.col("new_name").isNotNull(), col("value")))
    .withColumn("new_name", f.coalesce("new_name", f.upper(col("old_name"))))
    .groupBy("id")
    .pivot("new_name")
    .agg(f.first("value"))
    .drop("id")
    .show())

+---+---+---+----+
|  A|  B|  C|  DD|
+---+---+---+----+
|  1|  2|  3|null|
| 12| 21|  4|null|
+---+---+---+----+

but in your description nothing justifies this. Because number of columns is fairly limited, I'd rather:

mapping = dict(
    mapping_fields
        .filter(f.col("old_name").isin(df.columns))
        .select("old_name", "new_name").collect())

df.select([
  (f.lit(None).cast(t) if c not in mapping else col(c)).alias(mapping.get(c, c.upper()))
  for (c, t) in df.dtypes])

+---+---+---+----+
|  A|  B|  C|  DD|
+---+---+---+----+
|  1|  2|  3|null|
| 12| 21|  4|null|
+---+---+---+----+

At the end of the day you should use distributed processing when it provides performance or scalability improvements. Here it would do the opposite and make your code overly complicated.

To ignore no-matches:

(melt(df.withColumn("id", f.monotonically_increasing_id()),
       id_vars=["id"],  value_vars=df.columns, var_name="old_name")
    .join(mapping_fields, ["old_name"])
    .groupBy("id")
    .pivot("new_name")
    .agg(f.first("value"))
    .drop("id")
    .show())

or

df.select([
    col(c).alias(mapping.get(c))
    for (c, t) in df.dtypes if c in mapping])
zero323
  • 322,348
  • 103
  • 959
  • 935
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
0

I tried with a simple for loop,hope this helps too.

from pyspark.sql import functions as F
l1 = [('A','aa'),('B','bb'),('C','cc')]
l2 = [(1,2,3,43),(12,21,4,37)]

df1 = spark.createDataFrame(l1,['new_name','old_name'])
df2 = spark.createDataFrame(l2,['aa','bb','cc','dd'])

print df1.show()
+--------+--------+
|new_name|old_name|
+--------+--------+
|       A|      aa|
|       B|      bb|
|       C|      cc|
+--------+--------+
>>> df2.show()
+---+---+---+---+
| aa| bb| cc| dd|
+---+---+---+---+
|  1|  2|  3| 43|
| 12| 21|  4| 37|
+---+---+---+---+

when you need the missing column with null values,

>>>cols = df2.columns

>>> for i in cols:
       val = df1.where(df1['old_name'] == i).first()
       if val is not None:
           df2 = df2.withColumnRenamed(i,val['new_name'])
       else:
           df2 = df2.withColumn(i,F.lit(None))
>>> df2.show()
+---+---+---+----+
|  A|  B|  C|  dd|
+---+---+---+----+
|  1|  2|  3|null|
| 12| 21|  4|null|
+---+---+---+----+

when we need only the mapping columns,changing the else part,

else:
  df2 = df2.drop(i)

>>> df2.show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  2|  3|
| 12| 21|  4|
+---+---+---+

This will transform the original df2 dataframe though.

zero323
  • 322,348
  • 103
  • 959
  • 935
Suresh
  • 5,678
  • 2
  • 24
  • 40