1

I have the DF that need a new column added based on the broadcasted dictionary

Input spark DF:

df_k_col1   cust_grp_map
Col1            1       
Col2            2
Col3            3
Col5            5

using the nested dict below -

cust_grp_dict = {'cust_grp_map': {1: {'cust_grp_name': 'cg1', 'cust_type': 'ct1'},
                                  2: {'cust_grp_name': 'cg2', 'cust_type': 'ct2'},
                                  3: {'cust_grp_name': 'cg3', 'cust_type': 'ct3'},
                                  4: {'cust_grp_name': 'cg4', 'cust_type': 'ct4'},
                                  5: {'cust_grp_name': 'cg5', 'cust_type': ''}}}

I am expecting the output as -

Expected Output:                  
df_k_col1   cust_grp_map    cust_grp_name   cust_type
Col1            1               cg1             ct1
Col2            2               cg2             ct2
Col3            3               cg3             ct3
Col5            5               cg5             

I tried using the UDF and Chain method but getting into issues.

def get_customer_group(df_k, data):
    def get_df_k_cust_grp(mapping_data, key):
        def get_val(x):
            return mapping_data.value(x).get(key)

        return F.udf[get_val]

    b = spark.sparkContext.broadcast(data)

    df_k_custgrps = (df_k.withColumn("cust_grp_name",get_df_k_cust_grp(b, "cust_grp_name")(F.col("cust_grp_map"))))

    return df_k_custgrps

and Chain method -

cust_grp_mapper = F.create_map([F.lit(i) for i in chain(*{k+x:y for k,v in cust_grp_dict['cust_grp_map'].items() for x,y in v.items()}.items())])
df_k_custgrps = df_k.withColumn('cust_grp_name',cust_grp_mapper[F.concat('grp','name')])

Thanks for your help in advance

NNM
  • 358
  • 1
  • 10

1 Answers1

0

You can iterate through the dict and grab the column you want to map, then join against the same column

from pyspark.sql import Row

df.join(spark.createDataFrame([Row(**{'cust_grp_map': k, **v}) 
for k,v in cust_grp_dict.get("cust_grp_map").items()]),on='cust_grp_map').show()

+------------+---------+-------------+---------+
|cust_grp_map|df_k_col1|cust_grp_name|cust_type|
+------------+---------+-------------+---------+
|           1|     Col1|          cg1|      ct1|
|           2|     Col2|          cg2|      ct2|
|           3|     Col3|          cg3|      ct3|
|           5|     Col5|          cg5|         |
+------------+---------+-------------+---------+

Related read: What does ** (double star/asterisk) and * (star/asterisk) do for parameters?

anky
  • 74,114
  • 11
  • 41
  • 70
  • I tried with below- df_k.join(spark.createDataFrame([Row(**{'cust_grp_map': k, **v}) for k,v in cust_grp_dict.get("cust_grp_map").items()]),F.col('cust_grp_map')==F.col('cust_grp_map')) ERROR- TypeError: field cust_grp_map: Can not merge type and – NNM Aug 30 '21 at 20:01
  • seems to be an issue creating the dataframe with dictionary df_cgm = spark.createDataFrame([Row(**{'cust_grp_map': k, **v}) for k,v in cust_grp_dict.get("cust_grp_map").items()]) even this results in the same error – NNM Aug 30 '21 at 21:07
  • @NNM i will debug tomorrow as it's quite late here. But you can Cast the customer grp map col to integer because it seems the keys are inter whereas the values in dataframe is string or vice versa – anky Aug 30 '21 at 21:30
  • 1
    Thanks for help @anky I was able to get it working. My dictionary structure was causing the issues. Its working now. – NNM Aug 31 '21 at 15:13