0

I'm trying to tune the performance of spark, by the use of partitioning on a spark dataframe. Here is the code:

file_path1 = spark.read.parquet(*paths[:15])
df = file_path1.select(columns) \
    .where((func.col("organization") == organization)) 
df = df.repartition(10)
#execute an action just to make spark execute the repartition step
df.first()

During the execution of first() I check the job stages in Spark UI and here what I find: Job details stage 7 details

  • Why there is no repartition step in the stage?
  • Why there is also stage 8? I only requested one action of first(). Is it because of the shuffle caused by the repartition?
  • Is there a way to change the repartition of the parquet files without having to occur to such operations? As initially when I read the df you can see that it's partitioned over 43k partitions which is really a lot (compared to its size when I save it to a csv file: 4 MB with 13k rows) and creating problems in further steps, that's why I wanted to repartition it.
  • Should I use cache() after repartition? df = df.repartition(10).cache()? As when I executed df.first() the second time, I also get a scheduled stage with 43k partitions, despite df.rdd.getNumPartitions() which returned 10. EDIT: the number of partitions is just to try. my questions are directed to help me understand how to do the right repartition.

Note: initially the Dataframe is read from a selection of parquet files in Hadoop.

I already read this as reference How does Spark partition(ing) work on files in HDFS?

SarahData
  • 769
  • 1
  • 12
  • 38
  • What is your spark.default.parallelism? And how many partition are in your parquet file? – RanP Feb 25 '19 at 12:36
  • I didn't understand the second questions.. if you mean the size of my parquet file, I don't know how to check that. Otherwise the default.parallelism is not set, thus using the default one.. `total number of cores on all executor nodes or 2, whichever is larger` and the allocation of CPU cores is dynamic. – SarahData Feb 25 '19 at 13:30
  • You can see the number of partitions in your parquet file by the number of "partXXX" file in the file directory in your hdfs. This is starting number of partitions you will have after reading the file. You can always do a rdd.coalesce(10) after reading the file. – RanP Feb 25 '19 at 14:59
  • do you know any command line that can help me count files "partXXX" number? I did a small google search and couldn't find one. – SarahData Feb 28 '19 at 10:03
  • If the parquet file is 50 partitions, you'll have files part-0000 till part-0049 . So you just ls the directory and sort by file name. – RanP Feb 28 '19 at 11:26
  • There's no `repartition` because it's been subsumed into the WholeStageCodegen optimisation, see https://stackoverflow.com/a/45937695/1335793 Not sure about your context for running `first()` a second time, is that in a notebook? It's 43K because that's how many tasks have been issued, probably because the initial 3.1TB of data consists of a lot of files. Parquet can help with skipping files at the source that don't match your organization filter, but if there's too many files it still has to inspect the footer of each one, bigger files can help. Should you `cache()`? Always try things out. – Davos Aug 05 '19 at 11:41

1 Answers1

0
  • Whenever there is shuffling, there is a new stage. and the
    repartition causes shuffling that"s why you have two stages.
  • the caching is used when you'll use the dataframe multiple times to avoid reading it twice.

Use coalesce instead of repartiton. I think it causes less shuffling since it only reduces the number of partitions.

firsni
  • 856
  • 6
  • 12
  • the size of that data is the initial one. not the `df` one, as it's selected (selecting some rows). I'm trying to partition the selected rows, not the 3.1 TB of data.. – SarahData Feb 25 '19 at 14:15
  • your answer doesn't answer my questions unfortunately. I want to use `repartition` not `coalesce` for this test case. If the shuffling occurred because of the repartition, why I don't get the data repartitioned as requested in my second action performed on the DataFrame? – SarahData Feb 25 '19 at 14:58