5

I have a PySpark Dataframe with two columns (A, B, whose type is double) whose values are either 0.0 or 1.0. I am trying to add a new column, which is the sum of those two. I followed examples in Pyspark: Pass multiple columns in UDF

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType
sum_cols = F.udf(lambda x: x[0]+x[1], IntegerType())
df_with_sum = df.withColumn('SUM_COL',sum_cols(F.array('A','B')))
df_with_sum.select(['SUM_COL']).toPandas()

This shows a Series of NULLs instead of the results I expect.

I tried any of the following to see if there's an issue with data types

sum_cols = F.udf(lambda x: x[0], IntegerType())
sum_cols = F.udf(lambda x: int(x[0]), IntegerType())

still getting Nulls.

I tried removing the array:

sum_cols = F.udf(lambda x: x, IntegerType())
df_with_sum = df.withColumn('SUM_COL',sum_cols(df.A))

This works fine and shows 0/1

I tried removing the UDF, but leaving the array:

df_with_sum = df.withColumn('SUM_COL', F.array('A','B'))

This works fine and shows a series of arrays of [0.0/1.0, 0.0/1.0]

So, array works fine, UDF works fine, it is just when I try to pass an array to UDF that things break down. What am I doing wrong?

Álvaro Valencia
  • 1,187
  • 8
  • 17
eran
  • 14,496
  • 34
  • 98
  • 144
  • what are the datatypes of columns A and B? can you check that and update the question? – Ramesh Maharjan Sep 26 '18 at 04:17
  • @RameshMaharjan yes, updated! The type is double. – eran Sep 26 '18 at 04:59
  • how 0/1 is double? is it 0/1 or 0.1? – Ramesh Maharjan Sep 26 '18 at 05:08
  • @RameshMaharjan it is 0.0 or 1.0 for A and B, the output should be 0, 1 or 2 (depending on what the operation is, I showed several examples - in one of them I call int() within the UDF, and the UDF return is IntegerType, in another I don't use a UDF at all, so it is 0.0 or 1.0) – eran Sep 26 '18 at 05:20
  • I am not understanding the form `0.0/1.0` it should be 0.0 if the datatype is double. if the value is `0.0/1.0` then the datatype should be StringType. isn't it so? – Ramesh Maharjan Sep 26 '18 at 07:03
  • instead of lambda if we need to do in different function how can I do " – Naveen Srikanth May 06 '21 at 19:07

1 Answers1

5

The problem is that you are trying to return a double in a function that is supposed to output an integer, which does not fit, and pyspark by default silently resorts to NULL when a casting fails:

df_with_doubles = spark.createDataFrame([(1.0,1.0), (2.0,2.0)], ['A', 'B'])
sum_cols = F.udf(lambda x: x[0]+x[1], IntegerType())
df_with_sum = df_with_double.withColumn('SUM_COL',sum_cols(F.array('A','B')))
df_with_sum.select(['SUM_COL']).toPandas()

You get:

  SUM_COL
0    None
1    None

However, if you do:

df_with_integers = spark.createDataFrame([(1,1), (2,2)], ['A', 'B'])
sum_cols = F.udf(lambda x: x[0]+x[1], IntegerType())
df_with_sum = df_with_integers.withColumn('SUM_COL',sum_cols(F.array('A','B')))
df_with_sum.select(['SUM_COL']).toPandas()

You get:

   SUM_COL
0        2
1        4

So, either cast your columns to IntegerType beforehand (or cast them in the UDF), or change the return type of the UDF to DoubleType.

martinarroyo
  • 9,389
  • 3
  • 38
  • 75
  • Is there any way to get it to show logs or any hint that the problem is a casting failure vs. something else? – szeitlin Jun 21 '23 at 23:09