0

We have a very large Pyspark Dataframe, on which we need to perform a groupBy operation.

We've tried with

df_gp=df.groupBy('some_column').count()

and it's taking a really long time (it's been running for more than 17hrs with no result).

I also tried with

df_gp=df.groupBy('some_column').agg(count)

but as far as I can tell the behaviour is the same.

For more context :

  • we are running this operation on Zeppelin (Version 0.8.0), using %spark2.pyspark interpreter
  • Zeppelin is running on a Yarn client
  • Data is stored on Hive (Hive 3.1.0.3.1.0.0-78)
  • Initial Dataframe is created by querying Hive with llap :
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()

req=""" SELECT *
        FROM table
        where isodate='2020-07-27'
    """

df = hive.executeQuery(req)
  • Dataframe size is ~60 millions rows, 9 columns
  • other operations performed on the same Dataframe on the same environment, such as count() or cache() work in under a minute

I've been reading about Spark's groupBy on different sources, but from what I gathered here, Dataframe API doesn't need to load or shuffle keys in memory, so it should not be a problem even on large Dataframes.

I get that a groupBy on such a large volume of data can take some time, but this is really too much. I guess there are some memory parameters that maybe need tuning, or is there maybe something wrong with the way we're executing the groupBy operation?

[EDIT] I forgot to mention there are some UDFs being processed on the Dataframe before groupBy. I've tried :

  • groupBy on a large Dataframe, without UDFs : gives result in less than a minute
  • groupBy on a sample of the processed Dataframe : same problem as before

So we're thinking the UDFs are the actual cause of the problem, not groupBy

Flxnt
  • 177
  • 4
  • 22

2 Answers2

1

A few myth bursters first

  1. .groupBy('some_column').count() and .groupBy('some_column').count() are same

  2. groupBy causes shuffle, what that post meant was that it only shuffles necessary column data only (no extra columns which are not used in groupBy or agg function)

    I've been reading about Spark's groupBy on different sources, but from what I gathered here, Dataframe API doesn't need to load or shuffle keys in memory, so it should not be a problem even on large Dataframes.

Now to your problem

  1. groupBy could take time if more data is shuffled and spark.sql.shuffle.partitions is set low (200 default). In such case 1 core will have a big chunk of shuffled data to aggregate
  2. it could also take a lot of time if column used in groupBy has data skew as it will cause a lot of data to go to a single executor core

Solution

  1. increase spark.sql.shuffle.partitions to a higher value (in my experience should be around <amount_of_data_shuffled_in_gb>/100MB to ensure 1 core gets around 100 MB data to aggregate
  2. skew can be solved with introducing randomness in data (salting) https://dzone.com/articles/why-your-spark-apps-are-slow-or-failing-part-ii-da
Samir Vyas
  • 442
  • 2
  • 6
  • Thanks! I've been trying on a small sample of the same dataset (~6000 lines) and the result is the same, but there are some UDFs before `groupBy` that might cause skew – Flxnt Aug 27 '20 at 13:18
0

It's possible that it's running slow because of the underlying Hive query and not because of the groupBy operation. As you probably know, spark does lazy evaluation, so the latency could be coming from either of the above. One way to test it, is to cache() the dataframe or calling a simple count() before executing groupBy on it. If you see the same issue, it's because of the hive query execution and the solution will look different there. You could also try to read the data from a file and see if you notice the same execution times when performing groupBy.

Reva
  • 9
  • 2
  • Thanks for your answer. I probably should mention it in the post, but we did try a `count()` on the dataframe before executing `groupBy()`, it gives a result in less than a minute. I will try to `cache()` the df and see if it changes anything. – Flxnt Aug 27 '20 at 08:27