The "old" SparkContext.hadoopFile
takes a minPartitions
argument, which is a hint for the number of partitions:
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
But there is no such argument on SparkContext.newAPIHadoopFile
:
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)]
In fact mapred.InputFormat.getSplits
takes a hint argument, but mapreduce.InputFormat.getSplits
takes a JobContext
. What is the way to influence the number of splits through the new API?
I have tried setting mapreduce.input.fileinputformat.split.maxsize
and fs.s3n.block.size
on the Configuration
object, but they had no effect. I am trying to load a 4.5 GB file from s3n
, and it gets loaded in a single task.
https://issues.apache.org/jira/browse/HADOOP-5861 is relevant, but it suggests that I should already see more than one split, since the default block size is 64 MB.