64

I'm working with Apache Spark on a Cluster using HDFS. As far as I understand, HDFS is distributing files on data-nodes. So if a put a "file.txt" on the filesystem, it will be split into partitions. Now I'm calling

rdd = SparkContext().textFile("hdfs://.../file.txt") 

from Apache Spark. Has rdd now automatically the same partitions as "file.txt" on the filesystem? What happens when I call

rdd.repartition(x)

where x > then the partitions used by hdfs? Will Spark physically rearrange the data on hdfs to work locally?

Example: I put a 30GB Textfile on the HDFS-System, which is distributing it on 10 nodes. Will Spark a) use the same 10 partitons? and b) shuffle 30GB across the cluster when I call repartition(1000)?

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
Degget
  • 643
  • 1
  • 6
  • 4

4 Answers4

103

When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the Hadoop InputFormat used to read this file. For instance, if you use textFile() it would be TextInputFormat in Hadoop, which would return you a single partition for a single block of HDFS (but the split between partitions would be done on line split, not the exact block split), unless you have a compressed text file. In case of compressed file you would get a single partition for a single file (as compressed text files are not splittable).

When you call rdd.repartition(x) it would perform a shuffle of the data from N partititons you have in rdd to x partitions you want to have, partitioning would be done on round robin basis.

If you have a 30GB uncompressed text file stored on HDFS, then with the default HDFS block size setting (128MB) it would be stored in 235 blocks, which means that the RDD you read from this file would have 235 partitions. When you call repartition(1000) your RDD would be marked as to be repartitioned, but in fact it would be shuffled to 1000 partitions only when you will execute an action on top of this RDD (lazy execution concept)

0x0FFF
  • 4,948
  • 3
  • 20
  • 26
  • 3
    So, am I correct in taking from this that `rdd = SparkContext().textFile("hdfs://.../file.txt")` will result in an RDD that is partitioned into the same number of blocks as the file is stored on in HDFS? And it is only possible to get it into more partitions if you reparation it `rdd.repartition(x)`? – monster Mar 26 '15 at 12:19
  • 18
    Not exactly. Ideally you would get the same number of blocks as you see in HDFS. But if the lines in your file is too long (longer than the block size), amount of partitions would be smaller. Preferred way to change the number of partitions in this case is to directly pass it to the call `rdd = SparkContext().textFile("hdfs://.../file.txt", 400)`, where 400 is the number of partitions. This case partitioning in 400 splits would be done by the Hadoop TextInputFormat, not Spark and it would work much faster. Spark `repartition()` would shuffle the data across the cluster, not really efficient – 0x0FFF Mar 26 '15 at 12:24
  • Thanks for your great question and answer! Could you please explain the case of "partitioning in 400 splits would be done by the Hadoop TextInputFormat"? – Marco Dinh Jun 13 '15 at 02:25
  • 0x0FFF: if the input file is a SequenceFile[BytesWritable, BytesWritable], so I have to use hadoopFile or sequenceFile (cannot use textFile). The problem is that BytesWritable is not serializable, so I convert it to array of byte (byte[]), but it returns an error "Default partitioner cannot partition array keys". I already look at the code, and Spark does not support array keys. So do you have any solution? – Marco Dinh Jun 13 '15 at 02:36
  • First question: just read the code of TextInputFormat, it is simple. Underlying data partitioning is abstracted from it, so it just works with input stream of data – 0x0FFF Jun 13 '15 at 10:24
  • Second question - you are free to write your own partitioner. If you really need to group data based on the byte sequence key, you have to do it. You can simply xor, use CRC32 or MurMur or CityHash or one of the many others – 0x0FFF Jun 13 '15 at 10:26
  • @0x0FFF I know this is a little late but when you say: "If you have a 30GB uncompressed text file stored on HDFS, then with the default HDFS block size setting (128MB) it would be stored in 235 blocks, which means that the RDD you read from this file would have 235 partitions." Does this mean that you have 235 RDD's for the entire data set(one for each of the 235 blocks of data)? –  Oct 08 '16 at 22:28
  • So if I do `df.repartition(500)` on this `DataFrame` having 235 `partition`s (created the way described in the answer) before invoking any `action` on it, would `Spark`'s *lazy evaluation* cause the `DataFrame` to be read with parallelism of 500 directly? – y2k-shubham Apr 13 '18 at 14:30
  • I am reading from local file system. Content of file is => this is input file. When I check number of partitions by => sc.textFile("file:///home/sukumaar/data/input").getNumPartitions . Then it shows '3' . Why this is happening ? BTW my machine has 4 cores. – nomadSK25 Oct 19 '19 at 19:11
  • @0x0FFF : Can you answer this question https://stackoverflow.com/q/60991846/1559331 – dileepVikram Apr 02 '20 at 18:33
  • @nomadSK25 what is you file size? – Neethu Lalitha Nov 07 '21 at 02:11
21

When reading non-bucketed HDFS files (e.g. parquet) with spark-sql, the number of DataFrame partitions df.rdd.getNumPartitions depends on these factors:

  • spark.default.parallelism (roughly translates to #cores available for the application)
  • spark.sql.files.maxPartitionBytes (default 128MB)
  • spark.sql.files.openCostInBytes (default 4MB)

A rough estimation of the number of partitions is:

  • If you have enough cores to read all your data in parallel, (i.e. at least one core for every 128MB of your data)

    AveragePartitionSize ≈ min(4MB, TotalDataSize/#cores) NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize

  • If you don't have enough cores,

    AveragePartitionSize ≈ 128MB NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize

The exact calculations are slightly complicated and can be found on the code base for FileSourceScanExec, refer here.

Apoorve Dave
  • 311
  • 2
  • 5
  • A side note: it is possible (and highly likely) that multiple _part files_ (e.g. part-0001.blah.snappy.parquet) are part of a same rdd partition in the dataframe. This depends on the file sizes and *maxSplitBytes* (=max rdd partition size) and individual file sizes. For e.g. 3 30mb part files are read in same rdd partition if max split bytes is 100mb – Apoorve Dave Dec 20 '18 at 20:53
  • how can I check the totalDataSize of a parquet file? – SarahData Feb 22 '19 at 21:37
  • totalDataSize = sum of size of all part-xxx.blah.parquet files. – Apoorve Dave Feb 26 '19 at 20:51
  • yes, but how to be able to sum them if not manually.. command line using `dfs` maybe? – SarahData Feb 27 '19 at 11:00
18

Here is the snapshot of "How blocks in HDFS are loaded into Spark workers as partitions"

In this images 4 HDFS blocks are loaded as Spark partitions inside 3 workers memory

Dataset in HDFS broken into partitions


Example: I put a 30GB Textfile on the HDFS-System, which is distributing it on 10 nodes.

Will Spark

a) use the same 10 partitions?

Spark load same 10 HDFS bocks to workers memory as partitions. I assume block size of 30 GB file should be 3 GB to get 10 partitions/blocks (with default conf)

b) shuffle 30GB across the cluster when I call repartition(1000)?

Yes, Spark shuffle the data among the worker nodes in order to create 1000 partitions in workers memory.

Note:

HDFS Block -> Spark partition   : One block can represent as One partition (by default)
Spark partition -> Workers      : Many/One partitions can present in One workers 
Community
  • 1
  • 1
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • Does Spark Partition live in memory or HDD? Can a block size be of 3 GB. HDFS block size is 128MB. – Kannan Oct 15 '17 at 07:03
  • 1
    @Kannan: We can set block/partition size of our choice, but we have to clear with the number and why that number. Partition can present in Memory and/or Disk, but Spark framework choice will be memory by default. Check this [answer for more on RDD and memory](https://stackoverflow.com/a/40733821/1592191) – mrsrinivas Oct 15 '17 at 08:54
  • After reading this i came to a conclusion that, data is stored as blocks in hdfs(Total 10 blocks). But while repartitioning it gets loaded into 1000 blocks in the cluster memory after action(DAG is executed) is performed, right? – vijayraj34 Apr 30 '18 at 05:23
  • 1
    Yes, While repartioning it gets shuffled into 1000 partitions. – mrsrinivas Apr 30 '18 at 08:37
  • If Partitions are each stored in worker's memory, why hdfs is used for as worker's have memory too @mrsrinivas – supernatural Dec 08 '19 at 16:45
  • @mrsrinivas can you answer this quetion https://stackoverflow.com/q/60991846/1559331 – dileepVikram Apr 02 '20 at 18:38
8

Addition to @0x0FFF If it take from HDFS as input file it will calculate like for this rdd = SparkContext().textFile("hdfs://.../file.txt") and when you do rdd.getNumPatitions it will result Max(2, Number of HDFS block). I did lot of experiments and found this as result. Again explicitly you can do rdd = SparkContext().textFile("hdfs://.../file.txt", 400) to get 400 as partitions or even can do re-partitions by rdd.repartition or decrease to 10 by rdd.coalesce(10)

ChikuMiku
  • 509
  • 2
  • 11
  • 22
  • I've always felt it's not necessary to partition a dataset into X > block count pieces. so felt the default parition scheme is already doing a perfect job. when do we need to repartition into X > block count paritions ? – zinking Aug 24 '17 at 09:19
  • Well, maybe when you need more number of the partitions compared to what we get from block count pieces. @zinking – ChikuMiku Aug 24 '17 at 14:16
  • 1
    right, like one block of file string divided into lines and processed by different processor? and does that incur other overheads – zinking Aug 25 '17 at 04:39