2

I have a folder which has 14 files in it. I run the spark-submit with 10 executors on a cluster, which has resource manager as yarn.

I create my first RDD as this:

JavaPairRDD<String,String> files = sc.wholeTextFiles(folderPath.toString(), 10);

However, files.getNumPartitions()gives me 7 or 8, randomly. Then I do not use coalesce/repartition anywhere and I finish my DAG with 7-8 partitions.

As I know, we gave argument as the "minimum" number of partitions, so that why Spark divide my RDD to 7-8 partitions?

I also run the same program with 20 partitions and it gave me 11 partitions.

I have seen a topic here, but it was about "more" partitions, which did not help me at all.

Note: In the program, I read another folder which has 10 files, and Spark creates 10 partitions successfully. I run the above problematic transformation after this successful job is finished.

File sizes: 1)25.07 KB 2)46.61 KB 3)126.34 KB 4)158.15 KB 5)169.21 KB 6)16.03 KB 7)67.41 KB 8)60.84 KB 9)70.83 KB 10)87.94 KB 11)99.29 KB 12)120.58 KB 13)170.43 KB 14)183.87 KB

Files are on the HDFS, block sizes are 128MB, replication factor 3.

Melih
  • 666
  • 1
  • 9
  • 24

1 Answers1

2

It would have been more clear if we have size of each file. But code will not be wrong. I am adding this answer as per spark code base

  • First off all, maxSplitSize will be calculated depends directory size and min partitions passed in wholeTextFiles

        def setMinPartitions(context: JobContext, minPartitions: Int) {
          val files = listStatus(context).asScala
          val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
          val maxSplitSize = Math.ceil(totalLen * 1.0 /
            (if (minPartitions == 0) 1 else minPartitions)).toLong
          super.setMaxSplitSize(maxSplitSize)
        }
        // file: WholeTextFileInputFormat.scala
    

    link

  • As per maxSplitSize splits(partitions in Spark) will be extracted from source.

        inputFormat.setMinPartitions(jobContext, minPartitions)
        val rawSplits = inputFormat.getSplits(jobContext).toArray // Here number of splits will be decides
        val result = new Array[Partition](rawSplits.size)
        for (i <- 0 until rawSplits.size) {
          result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
        }
        // file: WholeTextFileRDD.scala
    

    link

More information available at CombineFileInputFormat#getSplits class on reading files and preparing splits.

Note:

I referred Spark partitions as MapReduce splits here, as Spark borrowed input and output formatters from MapReduce

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125