7

The "old" SparkContext.hadoopFile takes a minPartitions argument, which is a hint for the number of partitions:

def hadoopFile[K, V](
  path: String,
  inputFormatClass: Class[_ <: InputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int = defaultMinPartitions
  ): RDD[(K, V)]

But there is no such argument on SparkContext.newAPIHadoopFile:

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
  path: String,
  fClass: Class[F],
  kClass: Class[K],
  vClass: Class[V],
  conf: Configuration = hadoopConfiguration): RDD[(K, V)]

In fact mapred.InputFormat.getSplits takes a hint argument, but mapreduce.InputFormat.getSplits takes a JobContext. What is the way to influence the number of splits through the new API?

I have tried setting mapreduce.input.fileinputformat.split.maxsize and fs.s3n.block.size on the Configuration object, but they had no effect. I am trying to load a 4.5 GB file from s3n, and it gets loaded in a single task.

https://issues.apache.org/jira/browse/HADOOP-5861 is relevant, but it suggests that I should already see more than one split, since the default block size is 64 MB.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • 2
    what about `"mapred.min.split.size"`, I would imagine the new api would still respect that if they didn't give a new way – aaronman Aug 22 '14 at 14:19

1 Answers1

7

The function newApiHadoopFile allows you to pass a configuration object so in that you can set mapred.max.split.size.

Even though this is in the mapred namespace since there is seemingly no new option I would imagine the new API will respect the variable.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
aaronman
  • 18,343
  • 7
  • 63
  • 78