14

I have launched my cluster this way:

/usr/lib/spark/bin/spark-submit --class MyClass --master yarn-cluster--num-executors 3 --driver-memory 10g --executor-memory 10g --executor-cores 4 /path/to/jar.jar

The first thing I do is read a big text file, and count it:

val file = sc.textFile("/path/to/file.txt.gz")
println(file.count())

When doing this, I see that only one of my nodes is actually reading the file and executing the count (because I only see one task). Is that expected? Should I repartition my RDD afterwards, or when I use map reduce functions, will Spark do it for me?

Stephane Maarek
  • 5,202
  • 9
  • 46
  • 87
  • What's your "defaultMinPartitions"? As the doc clearly says, textFile takes an optional number of partitions parameter, which defaults to that. – The Archetypal Paul Jan 24 '15 at 16:32
  • My defaultMinPartitions is greater than one. It seems that I can't force a specified number of partition, because it's only one text file... running.... val file = sc.textFile("/path/to/file.txt.gz",8) println(file.partitions.length) returns 1 – Stephane Maarek Jan 24 '15 at 17:08
  • Well, it has to do the reading in one place, because that's inherently serial. But I can't see why it would have that optional param if it didn't do _something_. – The Archetypal Paul Jan 24 '15 at 17:29
  • I see... so because count doesn't do much, it keeps it on one worker. But if I run a map or a reduce, it should start spreading the dataset around? – Stephane Maarek Jan 24 '15 at 17:31
  • No idea, sorry, but I'm guessing it should. – The Archetypal Paul Jan 24 '15 at 17:33

1 Answers1

22

It looks like you're working with a gzipped file.

Quoting from my answer here:

I think you've hit a fairly typical problem with gzipped files in that they cannot be loaded in parallel. More specifically, a single gzipped file cannot be loaded in parallel by multiple tasks, so Spark will load it with 1 task and thus give you an RDD with 1 partition.

You need to explicitly repartition the RDD after loading it so that more tasks can run on it parallel.

For example:

val file = sc.textFile("/path/to/file.txt.gz").repartition(sc.defaultParallelism * 3)
println(file.count())

Regarding the comments on your question, the reason setting minPartitions doesn't help here is because a gzipped file is not splittable, so Spark will always use 1 task to read the file.

If you set minPartitions when reading a regular text file, or a file compressed with a splittable compression format like bzip2, you'll see that Spark will actually deploy that number of tasks in parallel (up to the number of cores available in your cluster) to read the file.

Community
  • 1
  • 1
Nick Chammas
  • 11,843
  • 8
  • 56
  • 115
  • Thanks! Would you recommend bzip2 over gzip then? If I load that file frequently, what should be my strategy to optimize every run? – Stephane Maarek Jan 24 '15 at 17:56
  • @Stephane - It depends on how much data is coming in and how much time your cluster spends repartitioning the data. A single gzipped file might be fine. If the one file is too big, you could probably also go with multiple gzipped files (i.e. splitting before compressing) as each gzipped file can be loaded in parallel into the same RDD (one task per file). That's probably the path of least resistance. – Nick Chammas Jan 24 '15 at 18:02
  • very very interesting thanks! So .gz.001 splitted files or bzip2... I'll experiment with both! I think that yes, the big bottleneck is the first repartition, so if I manage to already split my files when they're coming it might save me a little bit of time – Stephane Maarek Jan 24 '15 at 18:05
  • @Stephane, do you know why that limitation exists? It doesn't seem any easier to distribute the reading of a non-gzipped file - in both cases, you need to read the file serially to work out where the next record begins? – The Archetypal Paul Jan 26 '15 at 11:35
  • @Paul, I haven't experimented with bzip2 yet, I'll tell you if parallel reading truly works. I don't know, if the archive is splittable, then I guess you can read it in parallel (block 1 to n, n+1 to 2n, etc...) and then probably send the few missing bytes here and there to make sure every part is correctly formed. I hope that's what Spark does – Stephane Maarek Jan 26 '15 at 14:09
  • (parallel read does work --- but performance isn't improved too much from a repartition after a serial read, I guess it really depends on if the network is a bottleneck) – Stephane Maarek Jan 26 '15 at 16:02
  • @Stephane - What do your task stats look like? When you say "parallel read does work", how are you verifying that that is happening? – Nick Chammas Jan 27 '15 at 00:11
  • I used to see one task before. Now from what I have seen, with the bzip2 archive, as my file was 8GB, and the reading block on hadoop was 64MB, I saw around 125 tasks being created, and split between my various datanodes. If you click in the spark UI to see the tasks stats, you see the details (input, task time, gc, etc), and I saw I had 125 tasks – Stephane Maarek Jan 27 '15 at 10:19
  • do you know other method? I think repartition is too heavy for this purpose, because we don't need to shuffle all of the data but only split a large file into several partitions. – user2848932 Jul 20 '15 at 09:28
  • @user2848932 - If you don't shuffle the data, how will the other workers in your cluster get to share the work of processing it? Without shuffling, only 1 task on 1 worker will be able to process the gzipped file. Shuffling the data sends parts of it to other workers, which is relatively quick compared to how long you'd otherwise be waiting for 1 task to do all the work. – Nick Chammas Jul 20 '15 at 17:32