8

Is there any relationship between the number of elements an RDD contained and its ideal number of partitions ?

I have a RDD that has thousand of partitions (because I load it from a source file composed by multiple small files, that's a constraint I can't fix so I have to deal with it). I would like to repartition it (or use the coalescemethod). But I don't know in advance the exact number of events the RDD will contain.
So I would like to do it in an automated way. Something that will look like:

val numberOfElements = rdd.count()
val magicNumber = 100000
rdd.coalesce( numberOfElements / magicNumber)

Is there any rule of thumb about the optimal number of partition of a RDD and its number of elements ?

Thanks.

zero323
  • 322,348
  • 103
  • 959
  • 935
jmvllt
  • 299
  • 3
  • 8

2 Answers2

8

There isn't, because it is highly dependent on application, resources and data. There are some hard limitations (like various 2GB limits) but the rest you have to tune on task to task basis. Some factors to consider:

  • size of a single row / element
  • cost of a typical operation. If have small partitions and operations are cheap then scheduling cost can be much higher than the cost of data processing.
  • cost of processing partition when performing partition-wise (sort for example) operations.

If the core problem here is a number of the initial files then using some variant of CombineFileInputFormat could be a better idea than repartitioning / coalescing. For example:

sc.hadoopFile(
  path,
  classOf[CombineTextInputFormat],
  classOf[LongWritable], classOf[Text]
).map(_._2.toString)

See also How to calculate the best numberOfPartitions for coalesce?

zero323
  • 322,348
  • 103
  • 959
  • 935
2

While I'm completely agree with zero323, you still can implement some kind of heuristics. Internally we took size of data stored as avro key-value and compressed and computed number of partitions such that every partition won't be more than 64MB(totalVolume/64MB~number of partitions). Once in a while we run automatic job to recompute "optimal" number of partitions per each type of input etc. In our case it's easy to do since inputs are from hdfs(s3 will work too probably)

Once again it depends on your computation and your data, so your number might be completely different.

Igor Berman
  • 1,522
  • 10
  • 16
  • **@zero323 @Igor Berman** how should `partition`s be *weighed* for tuning `Spark`'s performance: by **number of records** or by **number of bytes**? My `Spark` job that reads data *parallely* from `MySQL` [is failing](https://stackoverflow.com/questions/49149996) and I suspect that *size of partitions* could be the culprit. [This link](https://www.slideshare.net/hadooparchbook/top-5-mistakes-when-writing-spark-applications-66374492/39) says that `partition`s should be **~ 128 MB** (no mention of no of rows) but mine (would) reach **~ 10 GB** with **~ 15 M** records (if read succeeds) – y2k-shubham Mar 23 '18 at 10:57
  • 1
    @y2k-shubham it depends. you can apply either of two. I've seen in some projects by count, and in another project by bytes. 10GB for 1 partition is way too big...regarding optimal size (64MB, 128MB more or less - you need to test, in any case it's below 1GB imo) – Igor Berman Mar 28 '18 at 20:00
  • **@Igor Berman** I acknowledge that **~ 10 GB** is *too big* size for `partition`s but it is by virtue of how my `DataFrame` is created. I'm using `Spark` `Jdbc` to read tables from `MySQL`. Based on my `MySQL` *instance-size*, I can only *parallelize* the read operation upto **~ 40 connections** (`numPartitions = 40`). As a result, *some* partitions of the created `DataFrame` end up being that big. I can always `repartition` it to smaller-size afterwards, but they would still have this much size upon creation. Since I've no control over `MySQL`, I'm unable to come up with a workaround for it. – y2k-shubham Mar 29 '18 at 03:52
  • 1
    @y2k-shubham are you familiar with https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#manage-parallelism ? I haven't used jdbc much so I can't give you any advice on this. What happens if you will create 1000 partitions, but will run it with parallelism of 40?(you will have relatively small partitions, but since parallelism is 40(or max cores) you won't excede connections usage). It's not perfect, but mysql is not built for big data processing. You can do it in 2 steps, with parallelism 40 select and store it into hdfs/s3 and then run with normal parallelism – Igor Berman Mar 29 '18 at 11:13