1

I want to generate a when clause based on values in a dict. Its very similar to what's being done How do I use multiple conditions with pyspark.sql.funtions.when()?

Only I want to pass a dict of cols and values

Let's say I have a dict:

{
  'employed': 'Y',
  'athlete': 'N'
}

I want to use that dict to generate the equivalent of:

df.withColumn("call_person",when((col("employed") == "Y") & (col("athlete") == "N"), "Y")

So the end result is:

+---+-----------+--------+-------+
| id|call_person|employed|athlete|
+---+-----------+--------+-------+
|  1|     Y     |    Y   |   N   |
|  2|     N     |    Y   |   Y   |
|  3|     N     |    N   |   N   |
+---+-----------+--------+-------+

Note part of the reason I want to do it programmatically is I have different length dicts (number of conditions)

2 Answers2

4

Use reduce() function:

from functools import reduce
from pyspark.sql.functions import when, col

# dictionary
d = {
  'employed': 'Y',
  'athlete': 'N'
}

# set up the conditions, multiple conditions merged with `&`
cond = reduce(lambda x,y: x&y, [ col(c) == v for c,v in d.items() if c in df.columns ])

# set up the new column
df.withColumn("call_person", when(cond, "Y").otherwise("N")).show()
+---+--------+-------+-----------+
| id|employed|athlete|call_person|
+---+--------+-------+-----------+
|  1|       Y|      N|          Y|
|  2|       Y|      Y|          N|
|  3|       N|      N|          N|
+---+--------+-------+-----------+
jxc
  • 13,553
  • 4
  • 16
  • 34
1

you can access dictionary items directly also:

dict ={
  'code': 'b',
  'amt': '4'
  }
list = [(1, 'code'),(1,'amt')]
df=spark.createDataFrame(list, ['id', 'dict_key'])

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

user_func =  udf (lambda x: dict.get(x), StringType())
newdf = df.withColumn('new_column',user_func(df.dict_key))

>>> newdf.show();
+---+--------+----------+
| id|dict_key|new_column|
+---+--------+----------+
|  1|    code|         b|
|  1|     amt|         4|
+---+--------+----------+

or broadcasting a dictionary

broadcast_dict = sc.broadcast(dict)

def my_func(key):
    return broadcast_dict.value.get(key)

new_my_func = udf(my_func, StringType())

newdf = df.withColumn('new_column',new_my_func(df.dict_key))
>>> newdf.show();
+---+--------+----------+
| id|dict_key|new_column|
+---+--------+----------+
|  1|    code|         b|
|  1|     amt|         4|
+---+--------+----------+
vikrant rana
  • 4,509
  • 6
  • 32
  • 72
  • 1
    I went ahead and added some results, b/c this wasn't what I was tryin to accomplish. Thanks for replying and my apologies for not being more clear in the original post – Braden Wright Aug 17 '19 at 05:30