2

I have a very simple word-count-like program that generates (Long, Double) counts like that:

val lines = sc.textFile(directory)

lines.repartition(600).mapPartitions{lineIterator => 
    // Generate iterator of (Long,Double) counts
}
.reduceByKey(new HashPartitioner(30), (v1, v2) => v1 + v2).saveAsTextFile(outDir, classOf[GzipCodec])

My problem: The last of the 30 partitions never gets written.

Here are a few details:

  • My input is 5 GB gz-compressed and I expect about 1B unique Long keys.
  • I run on a 32 core 1.5TB machine. Input and output come from a local disk with 2TB free. Spark is assigned to use all the ram and happily does so. This application occupies about 0.5 TB.

I can observe the following:

  • For 29 partitions the reduce and repartition (because of the HashPartitioner) takes about 2h. The last one does not finish, not even after a day. Two to four threads stay on 100%.
  • No error or warning appears in the log
  • Spark occupies about 100GB in /tmp which aligns with what the UI reports for shuffle write.
  • In the UI I can see the number of "shuffle read records" growing very, very slowly for the remaining task. After one day, still one magnitude away from what all the finished tasks show.

The last log looks like that:

15/08/03 23:26:43 INFO SparkHadoopWriter: attempt_201508031748_0002_m_000020_748: Committed
15/08/03 23:26:43 INFO Executor: Finished task 20.0 in stage 2.0 (TID 748). 865 bytes result sent to driver
15/08/03 23:27:50 INFO FileOutputCommitter: Saved output of task 'attempt_201508031748_0002_m_000009_737' to file:/output-dir/_temporary/0/task_201508031748_0002_m_000009
15/08/03 23:27:50 INFO SparkHadoopWriter: attempt_201508031748_0002_m_000009_737: Committed
15/08/03 23:27:50 INFO Executor: Finished task 9.0 in stage 2.0 (TID 737). 865 bytes result sent to driver
15/08/04 02:44:54 INFO BlockManager: Removing broadcast 3
15/08/04 02:44:54 INFO BlockManager: Removing block broadcast_3_piece0
15/08/04 02:44:54 INFO MemoryStore: Block broadcast_3_piece0 of size 2009 dropped from memory (free 611091153849)
15/08/04 02:44:54 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0
15/08/04 02:44:54 INFO BlockManager: Removing block broadcast_3
15/08/04 02:44:54 INFO MemoryStore: Block broadcast_3 of size 3336 dropped from memory (free 611091157185)
15/08/04 02:44:54 INFO BlockManager: Removing broadcast 4
15/08/04 02:44:54 INFO BlockManager: Removing block broadcast_4_piece0
15/08/04 02:44:54 INFO MemoryStore: Block broadcast_4_piece0 of size 2295 dropped from memory (free 611091159480)
15/08/04 02:44:54 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0
15/08/04 02:44:54 INFO BlockManager: Removing block broadcast_4
15/08/04 02:44:54 INFO MemoryStore: Block broadcast_4 of size 4016 dropped from memory (free 611091163496)

Imagine the first five lines repeated for 28 other partitions within a two minute time frame.

I have tried several things:

  • Spark 1.3.0 and 1.4.0
  • nio instead of netty
  • flatMap instead of mapPartitions
  • Just 30 instead of 600 input partitions

Still, I never get the last 1/30 of my data out of spark. Did anyone ever observe something similar? These two posts here and here seem to describe similar problems but no solution.

UPDATE

The task that never finishes is always the first task of the reduceKey+writeToTextFile. I have also removed the HashPartitioner and even tried on a bigger cluster with 400 cores and 6000 partitions. Only 5999 finish successfully, the last runs forever.

The UI shows for all tasks something like Shuffle Read Size / Records: 20.0 MB / 1954832 but for the first it shows (at the moment) Shuffle Read Size / Records: 150.1 MB / 711836

Numbers still growing....

Community
  • 1
  • 1
xhi
  • 93
  • 6

1 Answers1

0

It might be that your keys are very skewed. Depending on how they are distributed (or if you have a null or default key), a significant amount of the data might be going to a single executor and be no different than running in your local machine (plus overhead of a distributed platform). It might even be causing that machine to swap to disk, becoming intolerably slow.

Try using aggregateByKey instead of reduceByKey, since it will attempt to get partial sums distributed across executors instead of shuffling all the (potentially large) set of key-value pairs to a single executor. And maybe avoid fixing the number of output partitions to 30 just in case.


Edit: It is hard to detect the problem for "it just does not finish". One thing you can do is to introduce a timeout:

  val result = Await.result(future {
    // Your normal computation
  }, timeout)

That way, whatever task is taking too long, you can detect it and gather some metrics on the spot.

Daniel Langdon
  • 5,899
  • 4
  • 28
  • 48
  • Wait, it's all on this one 32 core machine and I can tell that disk and memory are not an issue. Yes, it's likely that there is a set of keys (say a few hundred) that make for the most tuples. But I would expect the HashPartitioner distributes them equally. There is definitely not _one'_ very frequent key. I'll try your aggregateByKey suggestion anyway! – xhi Aug 05 '15 at 06:45
  • Give it a try, but you have a weird issue. It could even be a particular data condition that causes an infinite loop? I edited my answer with one other option. Good luck! – Daniel Langdon Aug 05 '15 at 12:40
  • Tried the future wrapping -was not very enlightening - but thanks for your help!!! Also tried aggregateByKey... Updated the original question. – xhi Aug 07 '15 at 14:17
  • Mmm. You should at least been able to get a sample of the offending input and try it independently? Your code that generates the pairs must be caught in a infinite loop of some kind? – Daniel Langdon Aug 07 '15 at 14:28
  • But the map finishes - it's the reduce and write that never finishes. I can see 1585510308 records (101.7GB) shuffle write from map and 1573553370 (101.6GB) shuffle read from reduce. I don't see how there could be a hidden huge amount of data. – xhi Aug 07 '15 at 15:14
  • Dunno. My only guess was a skewed key that does not fit in any single reducer (even if the cluster has more resources). If aggregateByKey did not help I'm not sure what else could, as it would clearly shuffle at most 6000 counts to the final reducer. – Daniel Langdon Aug 07 '15 at 16:33