9

If I understand correctly, when a reduce task goes about gathering its input shuffle blocks ( from outputs of different map tasks ) it first keeps them in memory ( Q1 ). When the amount of shuffles-reserved memory of an executor ( before the change in memory management ( Q2 ) ) is exhausted, the in-memory data is "spilled" to disk. if spark.shuffle.spill.compress is true then that in-memory data is written to disk in a compressed fashion.

My questions:

Q0: Is my understanding correct?

Q1: Is the gathered data inside the reduce task always uncompressed?

Q2: How can I estimate the amount of executor memory available for gathering shuffle blocks?

Q3: I've seen the claim "shuffle spill happens when your dataset cannot fit in memory", but to my understanding as long as the shuffle-reserved executor memory is big enough to contain all the ( uncompressed ) shuffle input blocks of all its ACTIVE tasks, then no spill should occur, is that correct?

If so, to avoid spills one needs to make sure that the ( uncompressed ) data which ends up in all parallel reduce-side tasks is less than the executor's shuffle-reserved memory part?

Harel Gliksman
  • 734
  • 1
  • 7
  • 19
  • Which exact version of Spark prior to 1.6 are you using? What's the reason why you cannot use 1.6.1? – Sim Jun 16 '16 at 02:38
  • Most of my questions ( all except the specific size of the shuffle-relevant-memory ) are general and agnostic to a specific version. I am using 1.3.1 in production as this is what is available on EMR AMIs. Version 1.6.1 is available on new version EMR (release label) which I experiment but are not yet production. – Harel Gliksman Jun 16 '16 at 03:08
  • I would strongly advise you to switch to 1.6.1. 1.3.1 is ancient by Spark standards and has a lot of problems; just take a look at Spark's JIRA. The performance gains are likely going to exceed anything you can do by manually tweaking 1.3.1. – Sim Jun 16 '16 at 03:13
  • Again, the purpose of the question is to understand what exactly is "spill" in Spark as I couldn't find a valid resource that provides a clear explanation... the questions I raised are "holes" in my personal understanding. I think it might be relevant to any Spark user using any Spark version. – Harel Gliksman Jun 16 '16 at 03:38
  • 1
    The question is absolutely not relevant for any user using any Spark version as the internals of memory management vary a lot. If you are trying to understand the basics of what spills are, read this http://stackoverflow.com/questions/32210011/spark-difference-between-shuffle-write-shuffle-spill-memory-shuffle-spill – Sim Jun 16 '16 at 03:47

1 Answers1

6

There are differences in memory management in before and after 1.6. In both cases, there are notions of execution memory and storage memory. The difference is that before 1.6 it's static. Meaning there is a configuration parameter that specifies how much memory is for execution and for storage. And there is a spill, when either one is not enough.

One of the issues that Apache Spark has to workaround is a concurrent execution of:

  • different stages that are executed in parallel
  • different tasks like aggregation or sorting.

  1. I'd say that your understanding is correct.

  2. What's in memory is uncompressed or else it cannot be processed. Execution memory is spilled to disk in blocks and as you mentioned can be compressed.

  3. Well, since 1.3.1 you can configure it, then you know the size. As of what's left at any moment in time, you can see that by looking at the executor process with something like jstat -gcutil <pid> <period>. It might give you a clue of how much memory is free there. Knowing how much memory is configured for storage and execution, having as little default.parallelism as possible might give you a clue.

  4. That's true, but it's hard to reason about; there might be skew in the data such as some keys have more values than the others, there are many parallel executions, etc.

Michael Mior
  • 28,107
  • 9
  • 89
  • 113
evgenii
  • 1,190
  • 1
  • 8
  • 21