4

I am having the pyspark dataframe (df) having below sample table (table1): id, col1, col2, col3 1, abc, null, def 2, null, def, abc 3, def, abc, null

I am trying to get new column (final) by appending the all the columns by ignoring null values. I have tried pyspark code and used f.array(col1, col2, col3). Values are getting appended but it not ignoring null values. I have also tried UDF to append only non null columns but it is not working.

import pyspark.sql.functions as f    
df = spark.table('table1')
df = df.withColumn('final', f.array(col1,col2,col3))

Actual result:
id, col1, col2, col3, final
1, abc, null, def, [abc,,def]
2, null, def, abc, [,def, abc]
3, def, abc, null, [def,abc,,]

expected result:
id, col1, col2, col3, final
1, abc, null, def, [abc,def]
2, null, def, abc, [def, abc]
3, def, abc, null, [def,abc]


my col1, col2, col3 schema are as below:
where as col1 name is applications


applications: struct (nullable = false)
    applicationid: string (nullable = true)
    createdat: string (nullable = true)
    updatedat: string (nullable = true)
    source_name: string (nullable = true)
    status: string (nullable = true)
    creditbureautypeid: string (nullable = true)
    score: integer (nullable = true)
    applicationcreditreportid: string (nullable = true)
    firstname: string (nullable = false)
    lastname: string (nullable = false)
    dateofbirth: string (nullable = false)
    accounts: array (nullable = true)
        element: struct (containsNull = true)
        applicationcreditreportaccountid: string (nullable = true)
        currentbalance: integer (nullable = true)
        institutionid: string (nullable = true)
        accounttypeid: string (nullable = true)
        dayspastdue: integer (nullable = true)
        institution_name: string (nullable = true)
        account_type_name: string (nullable = true) 

please let me know if question is not clear or any more info is required. Any help would be appreciated. :)

ZygD
  • 22,092
  • 39
  • 79
  • 102
adarsh2109
  • 109
  • 1
  • 11
  • Actually, the version of pyspark probably won't make a difference: [How to remove nulls with array_remove Spark SQL Built-in Function](https://stackoverflow.com/questions/54159964/how-to-remove-nulls-with-array-remove-spark-sql-built-in-function/54176578#54176578) – pault Aug 23 '19 at 15:10

3 Answers3

12

Since Spark 2.4 you can use Higher Order Functions to do that (there is no UDF needed). In PySpark the query can look like this:

result = (
    df
    .withColumn("temp", f.array("col1", "col2", "col3"))
    .withColumn("final", f.expr("FILTER(temp, x -> x is not null)"))
    .drop("temp")
)
David Vrba
  • 2,984
  • 12
  • 16
2

Using an UDF

from pyspark.sql.functions import udf, array

def join_columns(row_list):
    return [cell_val for cell_val in row_list if cell_val is not None]

join_udf = udf(join_columns)

df = spark.table('table1')
df = df.withColumn('final', join_udf(array(col1,col2,col3))

Works for multiple columns not only just 3, just edit the columns inside the array.

pault
  • 41,343
  • 15
  • 107
  • 149
  • thanks for your response ! its working however my columns schema are complex. I gave string column as an example. When I am defining the function I have to give return type as well. I have mentioned my column schema. – adarsh2109 Aug 25 '19 at 12:49
0

You could define your own UDF as follows:

def only_not_null(st,nd,rd):
   return [x for x in  locals().values() if x is not None]  # Take non empty columns

And then call:

df = spark.table('table1')
df = df.withColumn('final', f.udf(only_not_null)(col1,col2,col3))
mik1904
  • 1,335
  • 9
  • 18