I have the DF that need a new column added based on the broadcasted dictionary
Input spark DF:
df_k_col1 cust_grp_map
Col1 1
Col2 2
Col3 3
Col5 5
using the nested dict below -
cust_grp_dict = {'cust_grp_map': {1: {'cust_grp_name': 'cg1', 'cust_type': 'ct1'},
2: {'cust_grp_name': 'cg2', 'cust_type': 'ct2'},
3: {'cust_grp_name': 'cg3', 'cust_type': 'ct3'},
4: {'cust_grp_name': 'cg4', 'cust_type': 'ct4'},
5: {'cust_grp_name': 'cg5', 'cust_type': ''}}}
I am expecting the output as -
Expected Output:
df_k_col1 cust_grp_map cust_grp_name cust_type
Col1 1 cg1 ct1
Col2 2 cg2 ct2
Col3 3 cg3 ct3
Col5 5 cg5
I tried using the UDF and Chain method but getting into issues.
def get_customer_group(df_k, data):
def get_df_k_cust_grp(mapping_data, key):
def get_val(x):
return mapping_data.value(x).get(key)
return F.udf[get_val]
b = spark.sparkContext.broadcast(data)
df_k_custgrps = (df_k.withColumn("cust_grp_name",get_df_k_cust_grp(b, "cust_grp_name")(F.col("cust_grp_map"))))
return df_k_custgrps
and Chain method -
cust_grp_mapper = F.create_map([F.lit(i) for i in chain(*{k+x:y for k,v in cust_grp_dict['cust_grp_map'].items() for x,y in v.items()}.items())])
df_k_custgrps = df_k.withColumn('cust_grp_name',cust_grp_mapper[F.concat('grp','name')])
Thanks for your help in advance