4

Suppose I have an input file of size 100MB. It contains large number of points (lat-long pair) in CSV format. What should I do in order to split the input file in 10 10MB files in Apache Spark or how do I customize the split.

Note: I want to process a subset of the points in each mapper.

Chandan
  • 764
  • 2
  • 8
  • 21

2 Answers2

5

Spark's abstraction doesn't provide explicit split of data. However you can control the parallelism in several ways.

Assuming you use YARN, HDFS file is automatically split into HDFS blocks and they're processed concurrently when Spark action is running.

Apart from HDFS parallelism, consider using partitioner with PairRDD. PairRDD is data type of RDD of key-value pairs and a partitioner manages mapping from a key to a partition. Default partitioner reads spark.default.parallelism. The partitioner helps to control the distribution of data as well as its locality in PairRDD-specific actions, e.g., reduceByKey.

Take a look at following documentation about Spark data parallelism.

http://spark.apache.org/docs/1.2.0/tuning.html

suztomo
  • 5,114
  • 2
  • 20
  • 21
  • 1
    Scenario: I have 50 points and a target point p0. I have to find the closest point to p0 among these 50 points. So I decided to break the 50 points into 5 10 points and run closest neighbour algorithm on each 10 points in parallel. I know from my little MapReduce knowledge that each mapper takes a single line without customization. Therefore each mapper gets a single point to operate on, not 10 points. How to solve this problem? – Chandan Dec 23 '14 at 15:15
  • 2
    Suztomo - There is also the `minPartitions` parameter on [`textFile()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.textFile) that gives some control over how many partitions to load a file into. @Chandan - If your RDD doesn't have enough partitions, try explicitly repartitioning it with `RDD.repartition(N)` before running your computation. More, smaller partitions will give each task (I don't think we talk about "mappers" in Spark) less work to do. – Nick Chammas Dec 24 '14 at 05:35
  • 1
    `JavaRDD lines = ctx.textFile("/home/hduser/Spark_programs/file.txt").cache(); lines.repartition(2); List partitions = lines.partitions(); System.out.println(partitions.size());` It is giving the only one partition. – Chandan Dec 24 '14 at 09:26
1

After searching through the Spark API I have found one method partition which returns the number of partitions of the JavaRDD. At the time of JavaRDD creation we have repartitioned it to desired number of partitions as told by @Nick Chammas.

JavaRDD<String> lines = ctx.textFile("/home/hduser/Spark_programs/file.txt").repartition(5);
List<Partition> partitions = lines.partitions();
System.out.println(partitions.size());
Chandan
  • 764
  • 2
  • 8
  • 21