9

I have a Pandas dataframe. I have tried to join two columns containing string values into a list first and then using zip, I joined each element of the list with '_'. My data set is like below:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'

I wanted to join these two columns in a third column like below for each row of my dataframe.

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]

I have successfully done so in python using the code below but the dataframe is quite large and it takes a very long time to run it for the whole dataframe. I want to do the same thing in PySpark for efficiency. I have read the data in spark dataframe successfully but I'm having a hard time determining how to replicate Pandas functions with PySpark equivalent functions. How can I get my desired result in PySpark?

df['column_3'] = df['column_2']
for index, row in df.iterrows():
  while index < 3:
    if isinstance(row['column_1'], str):      
      row['column_1'] = list(row['column_1'].split(','))
      row['column_2'] = list(row['column_2'].split(','))
      row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]

I have converted the two columns to arrays in PySpark by using the below code

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
    split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
    split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)

Now all I need is to zip each element of the arrays in the two columns using '_'. How can I use zip with this? Any help is appreciated.

10465355
  • 4,481
  • 2
  • 20
  • 44
Falconic
  • 307
  • 2
  • 3
  • 15
  • 1
    Why are `df['column_1']` and `df['column_2']` a single string instead of a list of items? What were they originally? – Foxan Ng Jan 21 '19 at 02:22
  • That's how the data is, which I am reading in the dataframe – Falconic Jan 21 '19 at 02:25
  • @Falconic so `abc` , `def` etc in single row or different rows? similarly column 2 single row? – anky Jan 21 '19 at 02:28
  • @anky_91 this is one row of dataframe for column_1 and column_2. Each row has multiple items in one column. This is the reason I split the string and then casted to a list. – Falconic Jan 21 '19 at 02:34
  • Does this answer your question? [Pyspark: Split multiple array columns into rows](https://stackoverflow.com/questions/41027315/pyspark-split-multiple-array-columns-into-rows) – Ani Menon May 26 '20 at 05:48

4 Answers4

19

A Spark SQL equivalent of Python's would be pyspark.sql.functions.arrays_zip:

pyspark.sql.functions.arrays_zip(*cols)

Collection function: Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.

So if you already have two arrays:

from pyspark.sql.functions import split

df = (spark
    .createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
    .toDF("column_1", "column_2")
    .withColumn("column_1", split("column_1", "\s*,\s*"))
    .withColumn("column_2", split("column_2", "\s*,\s*")))

You can just apply it on the result

from pyspark.sql.functions import arrays_zip

df_zipped = df.withColumn(
  "zipped", arrays_zip("column_1", "column_2")
)

df_zipped.select("zipped").show(truncate=False)
+------------------------------------+
|zipped                              |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+

Now to combine the results you can transform (How to use transform higher-order function?, TypeError: Column is not iterable - How to iterate over ArrayType()?):

df_zipped_concat = df_zipped.withColumn(
    "zipped_concat",
     expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
) 

df_zipped_concat.select("zipped_concat").show(truncate=False)
+---------------------------+
|zipped_concat              |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+

Note:

Higher order functions transform and arrays_zip has been introduced in Apache Spark 2.4.

Community
  • 1
  • 1
10465355
  • 4,481
  • 2
  • 20
  • 44
  • Thanks user10465355. This solution worked for me but just a word of caution. It does not handle null values inside the lists very well. I removed null values manually from both columns before joining them together. Secondly, I had to do every step in my original dataframe. Having ultiple dataframes with same column names did not work very well with PySpark. I had to debug that to see what was wrong in my code. Turns out, I needed to use the same dataframe throughout the different operations. – Falconic Jan 23 '19 at 00:57
5

For Spark 2.4+, this can be done using only zip_with function to zip a concatenate on the same time:

df.withColumn("column_3", expr("zip_with(column_1, column_2, (x, y) -> concat(x, '_', y))")) 

The higher-order function takes 2 arrays to merge, element-wise, using a lambda function (x, y) -> concat(x, '_', y).

blackbishop
  • 30,945
  • 11
  • 55
  • 76
4

You can also UDF to zip the split array columns,

df = spark.createDataFrame([('abc,def,ghi','1.0,2.0,3.0')], ['col1','col2']) 
+-----------+-----------+
|col1       |col2       |
+-----------+-----------+
|abc,def,ghi|1.0,2.0,3.0|
+-----------+-----------+ ## Hope this is how your dataframe is

from pyspark.sql import functions as F
from pyspark.sql.types import *

def concat_udf(*args):
    return ['_'.join(x) for x in zip(*args)]

udf1 = F.udf(concat_udf,ArrayType(StringType()))
df = df.withColumn('col3',udf1(F.split(df.col1,','),F.split(df.col2,',')))
df.show(1,False)
+-----------+-----------+---------------------------+
|col1       |col2       |col3                       |
+-----------+-----------+---------------------------+
|abc,def,ghi|1.0,2.0,3.0|[abc_1.0, def_2.0, ghi_3.0]|
+-----------+-----------+---------------------------+
Suresh
  • 5,678
  • 2
  • 24
  • 40
  • Thanks @suresh. This is definitely a cleaner solution. When I apply it to my own dataframe and run collect function, I get below error TypeError: zip argument #1 must support iteration Any ideas why? – Falconic Jan 21 '19 at 05:10
  • The error is because, `zip()` doesn't get an iterable as input. can you please post your sample input dataframe and it's schema . – Suresh Jan 21 '19 at 06:23
2

For Spark 3.1+, they now provide pyspark.sql.functions.zip_with() with Python lambda function, therefore it can be done like this:

import pyspark.sql.functions as F

df = df.withColumn("column_3", F.zip_with("column_1", "column_2", lambda x,y: F.concat_ws("_", x, y)))
johnnyasd12
  • 636
  • 7
  • 11