1

I have a pyspark Dataframe spark version < 2.4

Example dataframe:

column_1<Array>             |        column_2 <Array>           |  column_3 <Array>   |  join_columns
----------------------------------------------------------------------------------------------------------------------------------------      
["2345", "98576", "09857"]  |    null                           |  ["9857"]          |  ["2345", "98576", "09857", "9857"]
----------------------------------------------------------------------------------------------------------------------------------------
null                        | ["87569", "9876"]                 |  ["76586"]          |  ["87569", "9876","76586"]
----------------------------------------------------------------------------------------------------------------------------------------
["08798","07564"]           | ["12345","5768","89687","7564"]   |  ["7564"]          |  ["08798","07564","12345","5768","89687", "7564"]
----------------------------------------------------------------------------------------------------------------------------------------
["03456", "09867"]          |         ["87586"]                 |  []                 |  ["03456", "09867","87586"]
------------------------------------------------------------------------------------------------------------------------------------------

I would like to combine the 3 columns column_1, column_2 and column_3 in one "join_columns" and to drop the duplicates values. I used concat, it combined the 3 columns but only when I have only one value in the column, because may be "concat" is working only on Strings

df.withColumn("join_columns", concat(df.s, df.d)).drop_duplicates()

How can I combine the values of array columns ? Thank you

verojoucla
  • 599
  • 2
  • 12
  • 23

3 Answers3

1

Can you try using solution below, spark 2.4

import pyspark.sql.functions as F

df = df.withColumn('col12', F.array_union(df.column_1, df.column_2))
df = df.withColumn('join_columns_dup', F.array_union(df.col12, df.column_3))
df = df.withColumn('join_columns', F.array_distinct(df.join_columns_dup))

With Spark < 2.4, you can use

def array_concat(c1, c2, c3):
    return list(set((list() if c1 is None else c1) + (list() if c2 is None else c2) + (list() if c3 is None else c3)))

arrayConcatUdf = F.udf(array_concat, Types.ArrayType(Types.StringType()))
df = df.withColumn('join_columns', arrayConcatUdf(df.c1, df.c2, df.c3))

Crud but works fine with null value as well

Sagar
  • 373
  • 1
  • 6
1

In Spark 2.4 you can combine these 3 columns and then use the flatten function:

df.withColumn("join_columns", flatten(array("column1", "column2", "column2")))

In earlier spark versions you can make a UDF to do this flatten:

from pyspark.sql.functions import udf

flatten = udf(lambda arr: str(arr).replace("[", "").replace("]", "").split(","), ArrayType())
df.withColumn("join_columns", flatten(array("column1", "column2", "column2")))
Shadowtrooper
  • 1,372
  • 15
  • 28
1

Before Spark 2.4, you can use a udf:

from pyspark.sql.functions import udf

@udf('array<string>')
def array_union(*arr):
    return list(set([e.lstrip('0').zfill(5) for a in arr if isinstance(a, list) for e in a]))

df.withColumn('join_columns', array_union('column_1','column_2','column_3')).show(truncate=False)

Note: we use e.lstrip('0').zfill(5) so that for each array item, we first remove the leading 0 and then fill 0s to left if the length of string is less than 5.

jxc
  • 13,553
  • 4
  • 16
  • 34
  • do you have a suggestion about this issue https://stackoverflow.com/questions/59055915/convert-empty-array-to-null-pyspark/59056453#59056453 – verojoucla Nov 27 '19 at 08:22
  • @verojoucla, looks fine to me. you can also use a list comprehension: `df.selectExpr([ 'if({0} = array(""), null, `{0}`) AS `{0}`'.format(c) for c in df.columns])`. if the array is actual EMPTY, just change `array("")` to `array()`. – jxc Nov 27 '19 at 12:40
  • but the proposed solution in the question link is not working. – verojoucla Nov 27 '19 at 12:53
  • very good :) ok I have a new challenge here ;) https://stackoverflow.com/questions/59104192/filter-if-string-contain-sub-string-pyspark – verojoucla Nov 29 '19 at 11:56
  • 1
    @verojoucla, added an answer, let me know if it' works. – jxc Nov 29 '19 at 13:36