3

So I have a pyspark dataframe that I want to add another column to using the value from the Section_1 column and find its corresponding value in a python dictionary. So basically use the value from the Section_1 cell as the key and then fill in the value from the python dictionary in the new column like below.

Original dataframe

DataId ObjId Name Object Section_1
My data Data name Object name rd.111 rd.123

Python Dictionary

object_map= {'rd.123' : 'rd.567'}

Where section 1 has a value of rd.123 and I will search in the dictionary for the key 'rd.123' and want to return that value of rd.567 and place that in the new column

Desired DataFrame

DataId ObjId Name Object Section_1 Section_2
My data Data name Object name rd.111 rd.123 rd.567

Right now I got this error with my current code and I dont really know what I did wrong as I am not to familiar with pyspark

There is an incorrect call to a Column object in your code. Please review your code.

Here is my code that I am currently using where object_map is the python dictionary.

test_df = output.withColumn('Section_2', object_map.get(output.Section_1.collect()))
mck
  • 40,932
  • 13
  • 35
  • 50
MNM
  • 2,673
  • 6
  • 38
  • 73

1 Answers1

3

You can try this (adapted from this answer with added null handling):

from itertools import chain
from pyspark.sql.functions import create_map, lit, when

object_map = {'rd.123': 'rd.567'}
mapping_expr = create_map([lit(x) for x in chain(*object_map.items())])

df1 = df.filter(df['Section_1'].isNull()).withColumn('Section_2', F.lit(None))

df2 = df.filter(df['Section_1'].isNotNull()).withColumn(
    'Section_2', 
    when(
        df['Section_1'].isNotNull(), 
        mapping_expr[df['Section_1']]
    )
)

result = df1.unionAll(df2)
mck
  • 40,932
  • 13
  • 35
  • 50
  • I think this is really close, but still gives me the issue of null values – MNM Apr 23 '21 at 14:17
  • That was my original issue and saw it at the top of your code. Im not very use to pyspark and the imports – MNM Apr 23 '21 at 14:17
  • 1
    If you have null entries, you cannot map that to anything using the dictionary. – mck Apr 23 '21 at 14:18
  • I do have null entries in the dataframe. I see so could I use an if else like in python to map this? – MNM Apr 23 '21 at 14:23
  • Use `when` in Spark, not if/else, just like in my code, which checks that the column is not null. – mck Apr 23 '21 at 14:23
  • So what do you want to be the expected output when section_1 is null entry? – mck Apr 23 '21 at 14:24
  • Null again. That would be fine. – MNM Apr 23 '21 at 14:27
  • test_df = output.withColumn('Child_2',when(output['Child_1'].isNotNull(),mapping_expr[output['Child_1']]).otherwise(None)) – MNM Apr 23 '21 at 14:27
  • 1
    Then my code should already do this. The default for otherwise is None. – mck Apr 23 '21 at 14:28
  • I tried that but still gave me an error java.util.concurrent.ExecutionException: java.lang.RuntimeException: Cannot use null as map key. – MNM Apr 23 '21 at 14:28
  • could you start from the beginning to ensure that you don't have any old dataframes that contain the old code which does not handle nulls? – mck Apr 23 '21 at 14:29
  • 1
    avoid doing `df = df.withColumn(...)`. Use a new variable for each transformed dataframe. – mck Apr 23 '21 at 14:29
  • I redid my code and cleaned it, but still the java null error. should I switch to a different method over a dict like an array or something else? – MNM Apr 23 '21 at 14:34
  • It is close but gave me this error Union On Different Number Of Columns – MNM Apr 23 '21 at 14:57
  • But i think that actually works I just need to work a little at joining the two dataset, thank you I appreciate it – MNM Apr 23 '21 at 14:58
  • That worked perfectly, thank you for walking me through this. I really appreciate it. – MNM Apr 23 '21 at 15:18