13

Need an elegant way to rollback Delta Lake to a previous version.

My current approach is listed below:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, testFolder)

spark.read.format("delta")
  .option("versionAsOf", 0)
  .load(testFolder)
  .write
  .mode("overwrite")
  .format("delta")
  .save(testFolder)

This is ugly though, as the whole data set need to be rewritten. It seems that some meta update would be sufficient and no data I/O should be necessary. Anyone knows a better approach for this?

Alon
  • 10,381
  • 23
  • 88
  • 152
Fang Zhang
  • 1,597
  • 18
  • 18
  • I agree this is not an ideal solution, but given that overwriting a large data set with partitions could be expensive, this easy solution could be helpful. – Fang Zhang Aug 29 '19 at 18:04

5 Answers5

10

As of Delta Lake 0.7.0, you can rollback to an earlier version of your Delta Lake table using the RESTORE command. This is a much simpler way to use time travel to roll back your tables.

Scala:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

Python:

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

SQL:

RESTORE TABLE delta.`/path/to/delta-table` TO VERSION AS OF 0

You can also use the restoreToTimestamp command if you'd prefer to do things that way instead. Read the documentation for more details.

Crash Override
  • 121
  • 1
  • 3
4

Here is a brutal solution. It is not ideal, but given that overwriting a large data set with partitions could be expensive, this easy solution could be helpful.

If you are not very sensitive to updates after the desired rollback time, simply remove all version files in _delta_log that are later than the rollback time. Unreferenced files could be released later using vacuum.

Another solution that preserves the full history is to 1) deltaTable.delete 2) Copy all logs up to the rollback sequentially (with increasing version number) to the end of the delete log file. This mimics the creation of the delta lake up to the rollback date. But it is surely not pretty.

Fang Zhang
  • 1,597
  • 18
  • 18
  • 1
    Talked with an Databricks engineer today. He admitted that there is not an elegant solution to this problem at this moment, however, the issue is of high priority on their list. Right now, there are walk-arounds, but yes, they are ugly. – Fang Zhang Sep 06 '19 at 05:38
  • Have you an example of that "simply remove all version files in _delta_log that are later than the rollback time"? – Eric Bellet Sep 23 '19 at 16:19
  • 1
    Go to the _delta_log folder of your delta lake, you should be able to see version files such as 000...0001.json, 000...0002.json, etc. Each of them is corresponding to a commit. Do whatever you want(save, append, overwrite, etc) and you should see the version number keep increasing. To rollback an earlier version, say 3, then you could remove all version later than 3, say 000...0004.json, 000...0005.json, etc. Now read the delta lake, you should only get data of version 3. – Fang Zhang Sep 24 '19 at 18:43
  • Okay, thanks. Why you don't use timestampAsOf in the travel option? In my case, I need to make rollback by the hour, day, month or year. How do you know which is the version according to a timestamp? – Eric Bellet Sep 25 '19 at 14:13
  • So I understand you check in each .json file if the {"commitInfo":{"timestamp":1569487253571 ... } timestamp is greather than your rollback timestamp then you delete the log file – Eric Bellet Sep 26 '19 at 11:03
  • 1
    You don't have to check contents in the file. The timestamps of .json files are sufficient. Your could also use the "history" function of delta lake to get better visual display. – Fang Zhang Sep 26 '19 at 15:12
  • When I have two versions ..000 and ...001. And I delete 001 from metedata to do the rollback it works fine. But the problem is when I try to write to that table again and I get java.io.FileNotFoundException: No such file or directory:s3://.../_delta_log/...001.json. Hows that possible? – guderkar Feb 26 '20 at 15:20
  • @guderkar do you have any cached data in memory? Did you try to restart your application or notebook? – Fang Zhang Feb 26 '20 at 20:40
  • Yeah it worked after cluster restart, but it's kinda anoying. Is there a way to clear "local" delta cache? BTW the environment is Databricks. – guderkar Feb 27 '20 at 09:10
  • @guderkar I don't use Databricks, but I assume they have something similar to "kernel restart", as in Jupyter or Zeppelin; in this way, you restart Spark instead of the underlying physical cluster; the cached data will be released. – Fang Zhang Feb 27 '20 at 18:12
2

If your goal is to fix wrong data and you are not very sensitive to updates you can replace an interval of time.

 df.write
      .format("delta")
      .mode("overwrite")
      .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
      .save("/delta/events")
Eric Bellet
  • 1,732
  • 5
  • 22
  • 40
2

I had faced similar kind of problem with Delta, where I had been calling multiple dml operations in 1 transaction. e.g. I have had a requirement of calling merge and then delete in 1 single transaction. So, in this case, either both of them have to be successful together or rollback the state if any of them fails.

To solve the problem, I had taken the back up of _delta_log (let's call it stable state) directory before transaction starts. If both the DML operations in the transaction are successful then discard the previous stable state and use the new state committed in _delta_log, in case if any of the dml operation fails then just replace the _delta_log directory with the stable state which you took the backup before starting the transaction. Once replaced with a stable state, then just run the vacuum to remove stale files that may have been created during the transaction.

saurav
  • 3,424
  • 1
  • 22
  • 33
1

You should use the time travel feature: https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html

You read the data as at a timestamp:

val inputPath = "/path/to/my/table@20190101000000000"

And then overwrite the existing data with the "rolled back" version.

With regards to it being ugly, I'm not sure I can help. You could limit the data using partitioning. Or you could work out which records have changed and only overwrite them.

simon_dmorias
  • 2,343
  • 3
  • 19
  • 33
  • Version is for time travel.. they are essentially the same. – Fang Zhang Aug 27 '19 at 15:36
  • Using exact timestamp is not always convenient; I used corresponding version number instead. – Fang Zhang Aug 27 '19 at 15:41
  • I agree with both comments. Just saying that I think this is the right way to do this (rather than messing with the log). – simon_dmorias Aug 27 '19 at 15:50
  • I would say time travel + overwrite is a reasonable way with available APIs, however, it is not an optimized way. With all data available, additional I/O seems a waste to me. A better API should be there or probably be added. Adding a new version file pointing to the right data should be straight forward for people knowing the protocol well. – Fang Zhang Aug 27 '19 at 16:10