66

I am working with Spark and PySpark. I am trying to achieve the result equivalent to the following pseudocode:

df = df.withColumn('new_column', 
    IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR fruit2 IS NULL 3.)

I am trying to do this in PySpark but I'm not sure about the syntax. Any pointers? I looked into expr() but couldn't get it to work.

Note that df is a pyspark.sql.dataframe.DataFrame.

Community
  • 1
  • 1
user2205916
  • 3,196
  • 11
  • 54
  • 82

3 Answers3

109

There are a few efficient ways to implement this. Let's start with required imports:

from pyspark.sql.functions import col, expr, when

You can use Hive IF function inside expr:

new_column_1 = expr(
    """IF(fruit1 IS NULL OR fruit2 IS NULL, 3, IF(fruit1 = fruit2, 1, 0))"""
)

or when + otherwise:

new_column_2 = when(
    col("fruit1").isNull() | col("fruit2").isNull(), 3
).when(col("fruit1") == col("fruit2"), 1).otherwise(0)

Finally you could use following trick:

from pyspark.sql.functions import coalesce, lit

new_column_3 = coalesce((col("fruit1") == col("fruit2")).cast("int"), lit(3))

With example data:

df = sc.parallelize([
    ("orange", "apple"), ("kiwi", None), (None, "banana"), 
    ("mango", "mango"), (None, None)
]).toDF(["fruit1", "fruit2"])

you can use this as follows:

(df
    .withColumn("new_column_1", new_column_1)
    .withColumn("new_column_2", new_column_2)
    .withColumn("new_column_3", new_column_3))

and the result is:

+------+------+------------+------------+------------+
|fruit1|fruit2|new_column_1|new_column_2|new_column_3|
+------+------+------------+------------+------------+
|orange| apple|           0|           0|           0|
|  kiwi|  null|           3|           3|           3|
|  null|banana|           3|           3|           3|
| mango| mango|           1|           1|           1|
|  null|  null|           3|           3|           3|
+------+------+------------+------------+------------+
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 3
    In spark 2.2+, the function 'col' did not work for me. Using directly the column names without quotes worked. For example: new_column_1 = expr(" col_1 + int(col_2/15) ") – smishra Aug 15 '19 at 15:03
30

You'll want to use a udf as below

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def func(fruit1, fruit2):
    if fruit1 == None or fruit2 == None:
        return 3
    if fruit1 == fruit2:
        return 1
    return 0

func_udf = udf(func, IntegerType())
df = df.withColumn('new_column',func_udf(df['fruit1'], df['fruit2']))
David
  • 11,245
  • 3
  • 41
  • 46
  • 1
    I got a couple errors from this solution, @David. First one was solved with `from pyspark.sql.types import StringType`. Second one is: `TypeError: 'int' object is not callable`, which I'm not sure how to resolve. Note that `df` is a `pyspark.sql.dataframe.DataFrame`. – user2205916 Oct 20 '16 at 19:03
  • 1
    @user2205916 I had a couple of typos. In the line `def func(...` I had `fruit 1` (with a space) instead of `fruit1`. In the line starting `func_udf =...` I had `StringType` instead of `IntegerType`. Try it with the updated code and let me know if you still have issues – David Oct 20 '16 at 19:17
  • I get the same error message. Also, I think a paren is missing at the end of `df = . . .` – user2205916 Oct 20 '16 at 19:24
  • Ugh another typo, 2nd to last line should be `func_udf = udf(func, IntegerType())` – David Oct 20 '16 at 19:32
  • Have to run, but this is close (typos withstanding). If it still isn't working, make sure you don't have a situation like this http://stackoverflow.com/questions/9767391/typeerror-int-object-is-not-callable – David Oct 20 '16 at 19:40
19

The withColumn function in pyspark enables you to make a new variable with conditions, add in the when and otherwise functions and you have a properly working if then else structure.

For all of this you would need to import the sparksql functions, as you will see that the following bit of code will not work without the col() function.

In the first bit, we declare a new column -'new column', and then give the condition enclosed in when function (i.e. fruit1==fruit2) then give 1 if the condition is true, if untrue the control goes to the otherwise which then takes care of the second condition (fruit1 or fruit2 is Null) with the isNull() function and if true 3 is returned and if false, the otherwise is checked again giving 0 as the answer.

from pyspark.sql import functions as F

df=df.withColumn('new_column', 
    F.when(F.col('fruit1')==F.col('fruit2'), 1)
    .otherwise(F.when((F.col('fruit1').isNull()) | (F.col('fruit2').isNull()), 3))
    .otherwise(0))
Nidhi
  • 561
  • 4
  • 7