-2

I'm running a Pyspark process that works without issuess. The first step of the process is to apply specific UDF to the dataframe. This is the function:

import html2text

class Udfs(object):
    def __init__(self):
        self.h2t = html2text.HTML2Text()
        self.h2t.ignore_links = True
        self.h2t.ignore_images = True

    def extract_text(self, raw_text):
        try:
            texto = self.h2t.handle(raw_text)
        except:
            texto = "PARSE HTML ERROR"
        return texto

Here is how I apply the UDF:

import pyspark.sql.functions as f
import pyspark.sql.types as t
from udfs import Udfs

udfs = Udfs()
extract_text_udf = f.udf(udfs.extract_text, t.StringType())
df = df.withColumn("texto", extract_text_udf("html_raw"))

It process approximately 29 million rows and 300GB. The problem is that some tasks takes too much time to process. The average times of the tasks are:

average times

Other tasks have finished with a duration higher than 1 hour.

But some tasks takes too much time processing:

task time

The process run in AWS with EMR in a cluster with 100 nodes, each node with 32gb of RAM and 4 CPUs. Also spark speculation is enabled.

Where is the problem with these tasks? It's a problem with the UDF? It's a thread problem?

Shadowtrooper
  • 1,372
  • 15
  • 28
  • What's your number of partition ? Did you try repartitioning or changing the number of partitions of your DataFrame ? Maybe your partitions are unbalanced: did you perform an action before calling your udf that could have unbalanced your partitions ? – linog Apr 11 '20 at 19:09
  • What do you mean with unbalanced? How can I balance the data frame? Before running the dataframe I have done a repartition with 80000 partitions. – Shadowtrooper Apr 11 '20 at 19:14
  • By unbalanced I mean some partitions I mean that your 29 million rows are not splitted uniformly between partitions. You can find some elements [here](https://stackoverflow.com/questions/33505050/how-to-know-when-to-repartition-coalesce-rdd-with-unbalanced-partitions-without). I think it's way to much partitions. If I were you, I would make a try with less partitions. – linog Apr 11 '20 at 19:16

2 Answers2

4

My intuition is that you are using too many partitions. I would make a first try by significantly reducing their numbers. You can find this interesting post on the subject.

If your partitions are balanced, you have 29 millions /80k partitions = 362 observations in average by partition. I suppose it's not enough. You are spending lots of time in scheduling tasks rather than performing them.

The situation becomes worse if you don't have balanced partitions (see here. That typically creates bottlenecks, which is what seems to occur in your case. There are several options there:

  • You can coalesce your data to a lower number of partitions. That's better than using repartition because it avoids full shuffles
  • repartitionByRange if you want your data to be split based on some columns. You will not have as balanced partitions as you would have with coalesce or repartition but it can be useful with latter on you need to use operations by those splitting columns

You can change your default values regarding partition with spark.sql.shuffle.partitions and spark.default.parallelism.

It's a guess from my experience. Finding the adequat number of partition is hard but is worth it. Let me know if it helped or if you still experience bottlenecks.

linog
  • 5,786
  • 3
  • 14
  • 28
0

I have found the solution using repartitionByRange in the first dataframe. Usign with the correct id and number of partitions it balance the number of rows in each partition.

Shadowtrooper
  • 1,372
  • 15
  • 28