1

Is there a way to save or serialize the logical plan of a Spark DataFrame and replay it. For example, looking at the below plan:

val df = spark.read.option("multiLine", true).json("/home/rtf.json").withColumn("double", col("ROW_ID") * 2)
df.explain
== Physical Plan ==
*Project [ROW_ID#0L, TEXT#1, (ROW_ID#0L * 2) AS double#5L]
+- *FileScan json [ROW_ID#0L,TEXT#1] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/home/rtf.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ROW_ID:bigint,TEXT:string>
df.count
res1: Long = 10

What I would like to do is snapshot this plan so that if I went and added a row to /home/rtf.json, I would be able to replay it like:

val newDF = spark.plan.apply("path_to_saved_plan")
newDF.explain
    == Physical Plan ==
    *Project [ROW_ID#0L, TEXT#1, (ROW_ID#0L * 2) AS double#5L]
    +- *FileScan json [ROW_ID#0L,TEXT#1] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/home/rtf.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ROW_ID:bigint,TEXT:string>
newDF.count
res2: Long = 11 // Increased!

...yielding a dataframe with the same logical plan, but including the new row.

mongolol
  • 941
  • 1
  • 13
  • 31
  • Right - I suppose I'm looking for a way to save the pipeline down to a file for replaying at a later time. – mongolol Feb 19 '19 at 18:47
  • 2
    Well, it is in theory possible - there are `Dataset` constructors which accept logical plan, but this part of API is intentionally private. And I don't think there is any guarantee for plan to be forward compatible, which brings serious problems alone, if you want to treat this as a persistence method. Nor that there is much practical value - the expensive part happens when plan is resolved and optimized, not at the logical level. And resolution and optimization can happen only in a specific context, hence is not portable between `Datasets`. – 10465355 Feb 19 '19 at 19:27
  • Not quite as advanced as in ORACLE – thebluephantom Feb 19 '19 at 20:25
  • what is the benefit of saving the logical plan over just saving the spark code which generates this logical plan? – Raphael Roth Feb 19 '19 at 20:47
  • Well, an example would be if a DataFrame was created from an RDBMS, some data engineering/transformation was applied (outside of an ML pipeline), and then a Spark ML model was trained against it. Being able to then fetch live data from the RDBMS source and replay the data engineering/transformation steps (without having to copy the code) in order to pass the live data into the model for predictions would be valuable. – mongolol Feb 20 '19 at 08:26
  • @RaphaelRoth Some time the creation of the execution is very expensive and if I can replay it will be much faster. – igreenfield Oct 11 '20 at 16:42
  • You could take a look at checkpoint. It can be written to external file system and can be shared across different applications. https://stackoverflow.com/questions/57840083/spark-checkpointing-non-streaming-checkpoint-files-can-be-used-in-subsequent-j – Gladiator Jan 05 '21 at 13:24

0 Answers0