2

My dataframe

df_a = spark.createDataFrame( [ 
                               (0, ["B","C","D","E"] , [1,2,3,4] ),
                               (1,["E","A","C"] , [1,2,3] ),
                               (2, ["F","A","E","B"] , [1,2,3,4]),
                               (3,["E","G","A"] , [1,2,3 ]),
                               (4,["A","C","E","B","D"] , [1,2,3,4,5])] , ["id","items",'rank'])

and i want my output as :

+---+----+----+
| id|item|rank|
+---+----+----+
|  0|   B|   1|
|  0|   C|   2|
|  0|   D|   3|
|  0|   E|   4|
|  1|   E|   1|
|  1|   A|   2|
|  1|   C|   3|
|  2|   F|   1|
|  2|   A|   2|
|  2|   E|   3|
|  2|   B|   4|
|  3|   E|   1|
|  3|   G|   2|
|  3|   A|   3|
|  4|   A|   1|
|  4|   C|   2|
|  4|   E|   3|
|  4|   B|   4|
|  4|   D|   5|
+---+----+----+

my dataframe has 8 million rows and when i try to zip and explode as below, its extremely slow and job runs forever using 15executors and 25GB memory

zip_udf2 = F.udf(
    lambda x, y: list(zip(x, y)),
    ArrayType(StructType([
        StructField("item", StringType()),
        StructField("rank", IntegerType())
        
    ]))
)

(df_a
 .withColumn('tmp', zip_udf2("items", "rank"))
 .withColumn("tmp", F.explode('tmp'))
 .select("id", F.col("tmp.item"), F.col("tmp.rank"))
 .show())

Any alternate methods? i tried rdd.flatMap still didn't make a dent on the performance. number of elements in the arrays in each row varies.

Appden65
  • 89
  • 9

1 Answers1

1

UPDATE

Since you are using Spark 2.3.2 and arrays_zip isn't available, I did some tests comparing which is the best option: udf or posexplode. The quick answer is: posexplode.

(df_a
 .select('id', F.posexplode('items'), 'rank')
 .select('id', F.col('col').alias('item'), F.expr('rank[pos]').alias('rank'))
 .show())

Tests

from pyspark.sql.types import *

import pyspark.sql.functions as F
import time


df_a = spark.createDataFrame([ 
  (0, ["B","C","D","E"] , [1,2,3,4] ),
  (1,["E","A","C"] , [1,2,3] ),
  (2, ["F","A","E","B"] , [1,2,3,4]),
  (3,["E","G","A"] , [1,2,3 ]),
  (4,["A","C","E","B","D"] , [1,2,3,4,5])] , ["id","items",'rank'])


# My solution
def using_posexplode():
  (df_a
   .select('id', F.posexplode('items'), 'rank')
   .select('id', F.col('col').alias('item'), F.expr('rank[pos]').alias('rank'))
   .count())


# Your solution
zip_udf2 = F.udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
    StructField("item", StringType()),
    StructField("rank", IntegerType())
  ])))

def using_udf():
  (df_a
   .withColumn('tmp', zip_udf2("items", "rank"))
   .withColumn("tmp", F.explode('tmp'))
   .select("id", F.col("tmp.item"), F.col("tmp.rank"))
   .count())


def time_run_method(iterations, fn):
  t0 = time.time()
  for i in range(iterations):
    fn()
  td = time.time() - t0
  
  print(fn.__name__, "Time to count %d iterations: %s [sec]" % (iterations, "{:,}".format(td)))
  
for function in [using_posexplode, using_udf]:
  time_run_method(iterations=100, fn=function)

using_posexplode Time to count 100 iterations: 24.962905168533325 [sec]
using_udf Time to count 100 iterations: 44.120017290115356 [sec]

OLD

There is no guarantee that only this will solve your entire problem, but one thing to consider is to remove your zip_udf2 and change it to a Spark's native function arrays_zip. Here is an explanation about why we should avoid (when it's possible) UDF.

Kafels
  • 3,864
  • 1
  • 15
  • 32
  • I am using spark2.3.2 and arrays_zip doesn't exist. Hence had to resort to udf – Appden65 Sep 14 '21 at 00:02
  • Thank @kafels. Let me give this a try ! – Appden65 Sep 14 '21 at 00:47
  • Using 2 explode for each of the column produces multiple combination of rows. (df_a .withColumn('item', F.explode('items')) .withColumn('rank', F.explode('rank')) .select('id', 'item', 'rank') .count()) -> 75 rows I actually want item,rank to be zipped and produce only one row per combination – Appden65 Sep 14 '21 at 00:54
  • @Appden65 Check it again. I solved it using posexplode and expr – Kafels Sep 14 '21 at 01:22