4

Can someone explain me openCostInBytes in Apache Spark? I can see definition in documentation but I dont understand how exactly it can affect reading files. Should I really care about this and if yes how should I tune it?

  • 1
    You can mostly ignore it, it usually has a low impact on perf unless you have a very pathological use case and default tuning works OK. https://stackoverflow.com/questions/69034543/number-of-tasks-while-reading-hdfs-in-spark – rluta Feb 04 '22 at 14:33

2 Answers2

6

spark.files.openCostInBytes will affect how many partitions the input data will be read into. The exact calculation can be found in Filepartition.scala.

The way it exists at the time of this answer, the calculation is the following:

def maxSplitBytes(
    sparkSession: SparkSession,
    selectedPartitions: Seq[PartitionDirectory]): Long = {
  val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
  val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
  val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
    .getOrElse(sparkSession.leafNodeDefaultParallelism)
  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
  val bytesPerCore = totalBytes / minPartitionNum

  Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}

So that last line is the interesting one. We take the minimum of:

  • defaultMaxSplitBytes, which comes from spark.sql.files.maxPartitionBytes and is by default 128 * 1024 * 1024
  • the max of:
    • openCostInBytes, which comes from spark.sql.files.openCostInBytes, and is by default 4 * 1024
    • bytesPerCore which is totalBytes / minPartitionNum. minPartitionNum comes from spark.default.parallelism in the default case and this is equal to your total number of cores

So now we know this, we can try to understand the 3 edge cases of this calculation (taking into account default values of the parameters):

  • If the result is the value of defaultMaxSplitBytes, this is because we have a bytesPerCore that is larger than the other values. This only happens when we're handling BIG files. So big, that if we would fairly split the data over all the cores it would be bigger than defaultMaxSplitBytes. So here we are limiting the size of each partition.
  • If the result is the value of bytesPerCore, then that means that it was smaller than 128MB but larger than 4MB. In this case, we are fairly splitting the data over all of the cores.
  • If the result is the value of openCostInBytes, then that means bytesPerCore was so small it was smaller than 4MB. Since each partition has a cost of opening, we want to limit the amount of partitions that get created. So in this case, we are limiting the amount of partitions created

From understanding this, we see that this value only has an effect if your data is small w.r.t. your cluster (i.e. if your data size / nr of cores in cluster is small)

Hope this helps!

Koedlt
  • 4,286
  • 8
  • 15
  • 33
2

This is useful only when bytes / core is small. The default is 4MB.
Example: say you have 1 MB per core. Spark will create 1MB partition on each core. But 1MB is very tiny in big data world. So Spark calculates partition size using formula max(openCostInBytes, bytes/core). In this case, max(4MB, 1MB). So Spark will create 4MB partitions. This means some cores will go idle. Bad thing? Not necessary. There are costs associated with larger number of nodes and cores; e.g. opening connection, shuffling data between executors, sequential disk access. Reading/writing 4MB data could be faster than reading/writing 1MB of data in four different nodes/executors.
So when dealing with large number of small files, keep this value slightly above the average size of small files, so that the whole file gets read sequentially from disk in to a single partition.

Apurva Singh
  • 4,534
  • 4
  • 33
  • 42