11

I'm having some trouble with a Pyspark Dataframe. Specifically, I'm trying to create a column for a dataframe, which is a result of coalescing two columns of the dataframe.

E.g.

this_dataframe = this_dataframe.withColumn('new_max_price', coalesce(this_dataframe['max_price'],this_dataframe['avg(max_price)']).cast(FloatType()))

The problem with this code is that it still returns values of "null" in certain rows. Specifically I'm running this code:

this_dataset.where(col("new_max_price").isNull()).count()

This code gives positive results. So, while this code works, it does not produce intended results.

I found some other questions (such as Selecting values from non-null columns in a PySpark DataFrame) that were asked that were similar, but for some reason I'm unable to replicate their results.

Here's some code that I have that is based on the aforementioned link:

def coalesce_columns(c1, c2):
    if c1 != None and c2 != None:
        return c1
    elif c1 == None:
        return c2
    else:
        return c1

coalesceUDF = udf(coalesce_columns)
max_price_col = [coalesceUDF(col("max_price"), col("avg(max_price)")).alias("competitive_max_price")]
this_dataset.select(max_price_col).show()

When I try to execute the last line to test that my results are correct I receive an error.

AttributeError: 'unicode' object has no attribute 'isNull'

So basically the question is, how can I use a spark sql function to create a column that is the result of coalescing two pyspark dataframe columns? If this is impossible, what kind of UDF can I use in order to create some dataframe column that I can append to another dataframe?

Community
  • 1
  • 1
BMac
  • 173
  • 1
  • 1
  • 10
  • No `isNull` in your code. Double check if this is really what you use. –  Nov 01 '16 at 22:10

1 Answers1

20

I think that coalesce is actually doing its work and the root of the problem is that you have null values in both columns resulting in a null after coalescing. I give you an example that may help you.

from pyspark.sql.types import FloatType
from pyspark.sql.functions import *

data = [Row(a="3.07",b="3.05"),
        Row(a="3.06",b="3.06"),
        Row(a="3.09",b=None),
        Row(a=None,b=None),
        Row(a=None,b="3.06"),
        Row(a=None,b=None)
       ]

df = sqlContext.createDataFrame(data)

tmp = df.withColumn('c', coalesce(df['a'],df['b']).cast(FloatType()))

tmp.where(col("c").isNotNull()).show()


+----+----+----+
|   a|   b|   c|
+----+----+----+
|3.07|3.05|3.07|
|3.06|3.06|3.06|
|3.09|null|3.09|
|null|3.06|3.06|
+----+----+----+
Luis
  • 503
  • 5
  • 11
  • I have another question https://stackoverflow.com/questions/39344250/spark-assign-value-if-null-to-column-python thanks in advance! – jgtrz May 22 '20 at 22:27