5

There is a pyspark dataframe with missing values:

tbl = sc.parallelize([
        Row(first_name='Alice', last_name='Cooper'),             
        Row(first_name='Prince', last_name=None),
        Row(first_name=None, last_name='Lenon')
    ]).toDF()
tbl.show()

Here's the table:

  +----------+---------+
  |first_name|last_name|
  +----------+---------+
  |     Alice|   Cooper|
  |    Prince|     null|
  |      null|    Lenon|
  +----------+---------+

I would like to create a new column as follows:

  • if first name is None, take the last name
  • if last name is None, take the first name
  • if they are both present, concatenate them
  • we can safely assume that at least one of them is present

I can construct a simple function:

def combine_data(row):
    if row.last_name is None:
        return row.first_name
    elif row.first_name is None:
        return row.last_name
    else:
        return '%s %s' % (row.first_name, row.last_name)
tbl.map(combine_data).collect()

I do get the correct result, but I can't append it to the table as a column: tbl.withColumn('new_col', tbl.map(combine_data)) results in AssertionError: col should be Column

What is the best way to convert the result of map to a Column? Is there a preferred way to deal with null values?

Community
  • 1
  • 1
David D
  • 1,485
  • 4
  • 15
  • 19

2 Answers2

8

As always it is best to operate directly on native representation instead of fetching data to Python:

from pyspark.sql.functions import concat_ws, coalesce, lit, trim

def combine(*cols):
    return trim(concat_ws(" ", *[coalesce(c, lit("")) for c in cols]))

tbl.withColumn("foo", combine("first_name", "last_name")).
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thank you. Your answer addresses directly the example I gave, but the answer by @alberto-bonsanto is easier for me to read and is easier for me to modify for my actual needs – David D Mar 24 '16 at 05:40
4

You just need to use a UDF that receives two columns as arguments.

from pyspark.sql.functions import *
from pyspark.sql import Row

tbl = sc.parallelize([
        Row(first_name='Alice', last_name='Cooper'),             
        Row(first_name='Prince', last_name=None),
        Row(first_name=None, last_name='Lenon')
    ]).toDF()

tbl.show()

def combine(c1, c2):
  if c1 != None and c2 != None:
    return c1 + " " + c2
  elif c1 == None:
    return c2
  else:
    return c1

combineUDF = udf(combine)

expr = [c for c in ["first_name", "last_name"]] + [combineUDF(col("first_name"), col("last_name")).alias("full_name")]

tbl.select(*expr).show()

#+----------+---------+------------+
#|first_name|last_name|   full_name|
#+----------+---------+------------+
#|     Alice|   Cooper|Alice Cooper|
#|    Prince|     null|      Prince|
#|      null|    Lenon|       Lenon|
#+----------+---------+------------+
Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
  • [This](http://stackoverflow.com/questions/35066231/stack-overflow-while-processing-several-columns-with-a-udf) is the reason why sometimes it is better to use `select` instead of `withColumn` – Alberto Bonsanto Mar 23 '16 at 16:00