3

In Spark 2.2.0: I'm reading in one file using

spark.csv.read("filepath").load().rdd.getNumPartitions

I get 77 partitions for a 350 MB file in one system, and 88 partitions in another. I also get 226 partitions for a 28 GB file, which is roughly 28*1024 MB/128 MB . The question is, how does Spark CSV Data Source determine this default number of partitions?

L. Chu
  • 123
  • 3
  • 14
  • Please include Spark version. It looks like you don't use `spark-csv` but built-in Spark reader. [This might be helpful](https://stackoverflow.com/q/38249624/8371915) but won't fully answer your question. – Alper t. Turker May 23 '18 at 20:27
  • 2
    Updated question per suggestions! – L. Chu May 23 '18 at 20:37

4 Answers4

1

Number of partitions is influenced by multiple factors - typically

  • spark.default.parallelism
  • number of files that you're reading (if reading files from directory)
  • cluster manager/number of cores (see spark configuration) which influences spark.default.parallelism

Number of partitions when reading from text file (and CSV as well) should be determined as math.min(defaultParallelism, 2) based on CSVDataSource

JiriS
  • 6,870
  • 5
  • 31
  • 40
  • That's what I thought at first, but spark.default.parallelism is set at 100 and from the docs, it applies mainly to transformations like join. For 'math.min(defaultParallelism, 2)` is a nice thing to know, but that wouldn't explain how I get numbers greater than 2. Something else must be determining the number of partitions. – L. Chu May 23 '18 at 21:05
  • Might be interesting to have a bit more details about those systems where you're running those jobs like which mode are you using and if you have just one file or multiple. – JiriS May 23 '18 at 21:07
1

When reading csv files (single large file or multiple smaller files, compressed or not), I find that spark.sql.files.maxPartitionBytes has a big impact on the number of resulting partitions. Tweaking this value (default of 128MB, see https://spark.apache.org/docs/latest/sql-performance-tuning.html) was key for me.

Ardent Coder
  • 3,777
  • 9
  • 27
  • 53
ladoe00
  • 21
  • 3
0

No of partitions when reading from any file follows below formula.

step1: find file size/folder size from specified path which i was tested on local.You can find based on your requirements(either s3/hdfs).

import os
def find_folder_size(path):
    total = 0
    for entry in os.scandir(path):
        if entry.is_file():
            total += entry.stat().st_size
        elif entry.is_dir():
            total += find_folder_size(entry.path)
    return total

Step2 : Apply formula

target_partition_size = 200  #100 or 200 depends on your target partition
total_size = find_folder_size(paths)
print('Total size: {}'.format(total_size))
print(int(math.ceil(total_size / 1024.0 / 1024.0 / float(target_partition_size))))
num_partitions = int(math.ceil(total_size / 1024.0 / 1024.0 / float(target_partition_size)))
PARTITION_COLUMN_NAME = ['a','c']
df = df.repartition(num_partitions, PARTITION_COLUMN_NAME)
or 
df = df.repartition(num_partitions)

we can apply for either large data/small data to get number of partition.

  • The question is not as clear as I would've liked, but it isn't referring to reaching a target number of partitions via df.repartition. Rather, it's about the mechanism behind how spark determines default number of partitions. One would imagine it would be based on file size, but the behavior we saw was that Spark in 2.2.0 was giving a varied number of default partitions. Notably, even for small file sizes it was giving a large number of partitions, and it seemed to differ between files with the same size(may have been between two different spark instances on different machines). – L. Chu Mar 09 '20 at 18:21
0

As per my experience, It depends on spark.default.parallelism

Scenario 1 : File Size : 75MB defaultParallelism : 8

>>> sc.defaultParallelism
8
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
8

Scenario : 2 File Size : 75MB defaultParallelism : 10

>>> sc.defaultParallelism
10
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
10

Scenario 3 File Size : 75MB defaultParallelism : 4

>>> sc.defaultParallelism
4
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
4

Scenario 4 : File Size : 75MB defaultParallelism : 100

>>> sc.defaultParallelism
100
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
18

In scenario 4 , it divided data into possible number of partitions i.e 18

Based on it I am infering, initial number is dependent on value of spark.default.parallelism.

And if spark.default.parallelism is set to higher number the it just creates possible number of partitions based on hashing.