3

I'm writing a lot of data into Databricks Delta lake using the open source version, running on AWS EMR with S3 as storage layer. I'm using EMRFS.

For performance improvements, I'm compacting and vacuuming the table every so often like so:

    spark.read.format("delta").load(s3path)
            .repartition(num_files)
            .write.option("dataChange", "false").format("delta").mode("overwrite").save(s3path)
    
    t = DeltaTable.forPath(spark, path)
    t.vacuum(24)

It's then deleting 100k's of files from S3. However, the vacuum step takes an extremly long time. During this time, it appears the job is idle, however every ~5-10 minutes there will be a small task that indicates the job is alive and doing something. Starting from task 16

I've read through this post Spark: long delay between jobs which seems to suggest it may be related to parquet? But I don't see any options on the delta side to tune any parameters.

Arne Huang
  • 634
  • 6
  • 11

2 Answers2

6

I've also observed that the Delta vacuum command is quite slow. The open source developers are probably limited from making AWS specific optimizations in the repo because this library is cross platform (needs to work on all clouds).

I've noticed that vacuum is even slow locally. You can clone the Delta repo, run the test suite on your local machine, and see for yourself.

Deleting hundreds of thousands of files stored in S3 is slow, even if you're using the AWS CLI. You should see if you can refactor your compaction operation to create fewer files that need to be vacuumed.

Suppose your goal is to create 1GB files. Perhaps you have 15,000 one-gig files and 20,000 small files. Right now, your compaction operation is rewriting all of the data (so all 35,000 original files need to be vacuumed post-compaction). Try to refactor your code to only compact the 20,000 small files (so the vacuum operation only needs to delete 20,000 files).

The real solution is to build a vacuum command that's optimized for AWS. Delta Lake needs to work with all the popular clouds and the local filesystem. It should be pretty easy to make an open source library that reads the transaction log, figures out what files need to be deleted, makes a performant file deletion API call, and then writes out an entry to the transaction log that's Delta compliant. Maybe I'll make that repo ;)

Here's more info on the vacuum command. As a sidenote, you may way to use coalesce instead of repartition when compacting, as described here.

EDIT: Delta issue: https://github.com/delta-io/delta/issues/395 and PR: https://github.com/delta-io/delta/pull/416

Arne Huang
  • 634
  • 6
  • 11
Powers
  • 18,150
  • 10
  • 103
  • 108
  • 2
    Thanks for the reply. I'll give coalesce a shot, although I'll have to rebalance the partitions at some point later then. My thought was that the delete calls should come from the executors, but it seems like they are comming from the driver (and are synchronous/blocking as well?) which is causing the long delays. – Arne Huang Jul 12 '20 at 23:18
  • If I load the `_delta_log/_last_checkpoint` file, (say `000[...]991.checkpoint.parquet`) , filtered it for all files with a `deletionTimestamp` older than 7 days, to get the `path`s to the underlying parquet files I wanted to delete, then deleted them with the `aws cli`, would this cause problems with the delta table? Would I also have to make a new checkpoint file (say `000[...]992.checkpoint.parquet`) and overwrite the `_delta_log/_last_checkpoint` file so it points to that instead? – Clay Aug 21 '20 at 23:33
  • 1
    @Clay - that sounds like a good approach at a high level. Deleting all the files from the CLI is pretty slow if there is a massive number of files too. Might be quicker to apply a 1 day lifecycle policy to all the files. Definitely need a better solution cause what's currently offered is unusably slow. – Powers Aug 22 '20 at 14:55
  • @Powers Yes CLI is also very slow, but I can release cluster resources and simply use a bash script to delete all the files from S3. I am mainly concerned about not corrupting the `delta_log` files. Do I even need to worry about creating a new `_last_checkpoint` or new `delta_log\` json file? I could make this a separate question. – Clay Aug 22 '20 at 16:48
  • asked that question here: https://stackoverflow.com/q/63541030/5060792 – Clay Aug 22 '20 at 20:56
1

There was issue filed for this in deltalake

Problem Statement: Deltalake vacuum jobs are taking too long to finish as underneath file deletion logic was sequential. Known bug for deltalake (v0.6.1) Ref: https://github.com/delta-io/delta/issues/395

Solution: Deltalake team has resolved this issue & yet to be released stable version for this. Pull Request: https://github.com/delta-io/delta/pull/522

For v0.6.x

Lot of organizations are using 0.6.x in production & want this to be part of 0.6.x. Following are quick steps to generate delta 0.6.1 jar with this patch

https://swapnil-chougule.medium.com/delta-with-improved-vacuum-patch-381378e79d1d

With this change, parallel deletion of files is supported during vacuum job. It speeds up process & reduces execution time

Swapnil Chougule
  • 717
  • 9
  • 17