5

I can't find the information about Spark temporary data persistance on disk in official docs, only at some Spark optimization articles like this:

At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. Because they incur heavy disk and network I/O, stage boundaries can be expensive and should be avoided when possible.

Is persistance to disk on each stage boundary always applied for both: HashJoin and SortMergeJoin? Why does Spark (in-memory engine) does that persistance for tmp files before shuffle? Is that done for task-level recovery or something else?

P.S. Question relates mainly to Spark SQL API, while I'm also interested in Streaming & Structured Streaming

UPD: found a mention and more details of Why does it happens at "Stream Processing with Apache Spark book". Look for "Task Failure Recovery" and "Stage Failure Recovery" topics on referrenced page. As far as I understood, Why = recovery, When = always, since this is mechanics of Spark Core and Shuffle Service, that is responsible for data transfer. Moreover, all Spark's APIs (SQL, Streaming & Structured Streaming) are based on the same failover guarantees (of Spark Core/RDD). So I suppose that this is common behaviour for Spark in general

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
VB_
  • 45,112
  • 42
  • 145
  • 293

2 Answers2

7

It's a good question in that we hear of in-memory Spark vs. Hadoop, so a little confusing. The docs are terrible, but I ran a few things and verified observations by looking around to find a most excellent source: http://hydronitrogen.com/apache-spark-shuffles-explained-in-depth.html

Assuming an Action has been called - so as to avoid the obvious comment if this is not stated, assuming we are not talking about ResultStage and a broadcast join, then we are talking about ShuffleMapStage. We look at an RDD initially.

Then, borrowing from the url:

  • DAG dependency involving a shuffle means creation of a separate Stage.
  • Map operations are followed by Reduce operations and a Map and so forth.

CURRENT STAGE

  • All the (fused) Map operations are performed intra-Stage.
  • The next Stage requirement, a Reduce operation - e.g. a reduceByKey, means the output is hashed or sorted by key (K) at end of the Map operations of current Stage.
  • This grouped data is written to disk on the Worker where the Executor is - or storage tied to that Cloud version. (I would have thought in memory was possible, if data is small, but this is an architectural Spark approach as stated from the docs.)
  • The ShuffleManager is notified that hashed, mapped data is available for consumption by the next Stage. ShuffleManager keeps track of all keys/locations once all of the map side work is done.

NEXT STAGE

  • The next Stage, being a reduce, then gets the data from those locations by consulting the Shuffle Manager and using Block Manager.
  • The Executor may be re-used or be a new on another Worker, or another Executor on same Worker.

So, my understanding is that architecturally, Stages mean writing to disk, even if enough memory. Given finite resources of a Worker it makes sense that writing to disk occurs for this type of operation. The more important point is, of course, the 'Map Reduce' implementation. I summarized the excellent posting, that is your canonical source.

Of course, fault tolerance is aided by this persistence, less re-computation work.

Similar aspects apply to DFs.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • Is it safe to say that in Hadoop the flow is memory -> disk -> disk -> memory and in Spark the flow is memory -> disk -> memory. i.e. in Hadoop the network transfers from disk to disk and in spark the network transfer is from the disk to the RAM – figs_and_nuts May 31 '23 at 12:02
  • Hadoop is disk memory disk and repeats – thebluephantom May 31 '23 at 12:20
4

Spark is not, and never was, an "in-memory engine". If you check the internals it is pretty clear that it is neither optimized for in-memory processing, nor it is tuned for in-memory centered hardware.

On the contrary, almost all design decision were clearly made with an assumption that a size of the data as whole, as well as inputs and outputs of individual tasks, can exceed amount of the available memory of the the cluster and individual executor / executor thread respectively. Furthermore it is clearly designed to be used on commodity hardware.

Such implementation can be used for recovery or to avoid recompuation (see for example What does "Stage Skipped" mean in Apache Spark web UI?), but this is repurposing rather than initial goal.

10465355
  • 4,481
  • 2
  • 20
  • 44
  • Could you pls give more insight of why Spark is in magnitude order faster than Map-Reduce? Every tutorial says because Spark doesn't persist intermediate results to disk. I see why it's faster for narrow tranformations, not all of them can be expressed with single Map-Reduce's map operation. But the main performance impact happens because of wide transformtations. So why is Spark's wide transformations faster than Map-Reduce ? – VB_ Nov 16 '19 at 18:08
  • 3
    @VB_ _Every tutorial says because Spark doesn't persist intermediate results to disk_ - they're not talking about the same thing. Single Hadoop Job (map-only or map-reduce) by default persists data to distributed storage (i.e. HDFS) - that's the only medium of communication between jobs, and in general requires both disk and network IO to both write (here also comes replication) and read in new job. In contrast Spark stages write by default to local storage (which could be in-memory FS as far as Spark cares) and doesn't replicate data (there is some discussion about this behavior though). – 10465355 Nov 16 '19 at 20:06
  • @user12357420 I missed the point about DFS vs local FS, you're right. Thank you. Could you provide any link to the discussion? – VB_ Nov 16 '19 at 20:14
  • @VB_ Spark can run without HDFS and in the cloud with no data locality. Iterative processing and Streaming, see: https://www.lightbend.com/blog/how-spark-beats-mapreduce-event-streaming-iterative-algorithms-and-elasticity – thebluephantom Nov 16 '19 at 21:20
  • @VB_ _Could you provide any link to the discussion?_ - I don't have specific links at hand, but you can search JIRA and dev list for discussions about decoupling block manager, especially in context of dynamic allocations. _I missed the point about DFS vs local FS_ - there's even more than that here. For example Multiple piped Hadoop jobs will have to acquire resources separately, adding even more latency. At high level RDD with lineage (Spark) replaces DFS (Hadoop) as a resilience mechanism and that's the core advantage over Hadoop. – 10465355 Nov 17 '19 at 15:20