0

There are claims that a Spark RDD must be a deterministic function of its inputs, due to recomputation and fault tolerance, but there are also sanctioned non-deterministic RDD's, for example in SparkSQL or SparkML. Is there any formal guidance on how to use nondeterminism safely?

Consider this Spark job, with a diamond-shaped DAG.

val bar = (rdd map f) zip (rdd map g)
bar.saveAsTextFile("outfile")

If rdd is nondeterministic (e.g., random or timestamp), will outfile contain consistent data? Is it possible one component of the zip will be recomputed and the other component not? Is safety guaranteed if we checkpoint or persist rdd? Would a local checkpoint suffice?

Steve Mitchell
  • 1,895
  • 1
  • 15
  • 12

2 Answers2

1

General

Here are some of my takes and experience at a practical level:

  • If you read from tables / files in Hive, then Spark will make a list of all files used and what node provessed part of that list, so a re-computation will be consistent if it goes all the way back to the start, i.e. read from HDFS / Hive for that subset of data.

  • If you use random functions, then I .cache or .persist to avoid re-computation with different path logic. Of course, combined with the aformentioned, you would get different results if random function after reading and having to get data from source. See below.

  • Reading from a JDBC source there would be no guarantee on consistency / deterministic result if updating of that JDBC source is allowed at the same time of processing and the DAG recomputes from them.

Effect of checkpointing

In case of failure for whatever reason, computation all the way back to source from DAG, is expensive. A checkpoint taken at a given Stage stores the data to disk - local or HDFS, and if there is a subsequent failure, then re-computation starts from this point onwards, thus saving time. DAG Lineage is broken.

Final notes

What if the re-computation starts from a JDBC source or random functions used that when processed in a Stage could affect subsequently already processed partitions? I cannot prove it easily, but those results that do not fit in the "current node" re-processing, are I think discarded. It would not be practical otherwise is my take.

Relating to author's own answer, What is the difference between spark checkpoint and persist to a disk, the following should be noted: "... There are few important difference but the fundamental one is what happens with lineage. Persist / cache keeps lineage intact while checkpoint breaks lineage. ...". The statement in other answer is not correct.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • I hope somebody will answer with a formal reliable contract, maybe around checkpointing. Caching is best effort, and can fail. – Steve Mitchell Sep 07 '19 at 16:00
  • Thanks for your answer! What I'm looking for is a strategy for programming with nondeterministic data, and a reason to believe the strategy is and will stay safe (no mysterious bad outputs under stress). After persist to disk, it seems the lineage is not deleted but also not used within the same job. – Steve Mitchell Sep 08 '19 at 17:03
  • What you want is not possible. My answer gives examples of why. I too long for things not always possible. – thebluephantom Sep 08 '19 at 18:10
0

Regarding the use of cache, persist and checkpoint on rdd, according to this post persist(StorageLevel.DISK_ONLY) will effectively break the lineage within the current job, while checkpoint breaks the lineage across jobs (but doesn't clean up the files). I tentatively conclude that task retries after the persist or checkpoint will not break data consistency. The cache operation does not guarantee consistency.

Can there be problems before the persist? If rdd partitions contain independent random data, there is no problem with task retries on separate partitions. If rdd contains timestamps, then rdd should consist of a single partition.

I tentatively conclude that a safe strategy for computing with a nondeterministic RDD is to build it from "independent" partitions, which are safe to recompute separately, and to immediately persist to disk or checkpoint the RDD. Checkpoint is required if the RDD is re-used across jobs.

Some transformations introduce nondeterminism in the ordering of RDDs. If you need order consistency between re-used copies of an RDD (e.g., due to zipWithIndex), the lineage, back to the most recent persist or checkpoint, should not contain any order-modifying transformations.

Steve Mitchell
  • 1,895
  • 1
  • 15
  • 12