2

I have to frequently write Dataframes as Hive tables.

df.write.mode('overwrite').format('hive').saveAsTable(f'db.{file_nm}_PT')

Or use Spark SQL or Hive SQL to copy one table to another as backup.

INSERT OVERWRITE TABLE db.tbl_bkp PARTITION (op_cd, rpt_dt)
SELECT * FROM db.tbl;

Problem is: Writing to hive_saging_directory takes 25% of the total time, whereas 75% or more time goes in moving the ORC files from staging directory to the final partitioned directory structure.

21/11/13 00:51:25 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2019-10-24 with partSpec {rpt_dt=2019-10-24}
21/11/13 00:51:56 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2018-02-18/part-00058-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2018-02-18/part-00058-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:51:56 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2018-02-18 with partSpec {rpt_dt=2018-02-18}
21/11/13 00:52:31 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2019-01-29/part-00046-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2019-01-29/part-00046-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:52:31 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2019-01-29 with partSpec {rpt_dt=2019-01-29}
21/11/13 00:53:09 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2020-08-01/part-00020-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2020-08-01/part-00020-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:53:09 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2020-08-01 with partSpec {rpt_dt=2020-08-01}
21/11/13 00:53:46 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2021-07-12/part-00026-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2021-07-12/part-00026-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:53:46 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2021-07-12 with partSpec {rpt_dt=2021-07-12}
21/11/13 00:54:17 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2022-01-21/part-00062-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2022-01-21/part-00062-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:54:17 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2022-01-21 with partSpec {rpt_dt=2022-01-21}
21/11/13 00:54:49 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2018-01-20/part-00063-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2018-01-20/part-00063-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:54:49 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2018-01-20 with partSpec {rpt_dt=2018-01-20}
21/11/13 00:55:22 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2019-09-01/part-00037-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2019-09-01/part-00037-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true

This operation is quite fast on actual HDFS but on Google cloud blob this rename is actually copy pasting blobs and is very slow.

I have heard about direct path writes, can you all please suggest how to do that?

1 Answers1

5

(not so) Short answer

It's ... complicated. Very complicated. I wanted to write a short answer but I'd risk being misleading on several points. Instead I'll try to give a very short summary of the very long answer.


  • Alternatively, you can try switching to cloud-first formats like Apache Iceberg, Apache Hudi or Delta Lake.
  • I am not very familiar with these yet, but a quick look at Delta Lake's documentation convinced me that they had to deal with same kind of issues (cloud storages not being real file systems), and depending on which cloud you're on, it may require extra configuration, especially on GCP where the feature is flagged as experimental.
  • EDIT: Apache Iceberg does not have this issue as it uses metadata files to point to the real data files location. Thanks to this, changes to a table are committed via an atomic change on a single metadata file.
  • I am not very familiar with Apache Hudi, and I couldn't find any mention of them dealing with these kind of issues. I'd have to dig further into their design architecture to know for sure.

Now, for the long answer, maybe I should write a blog article... I'll post it here whenever it's done.

FurryMachine
  • 1,543
  • 14
  • 12
  • good answer. yes, iceberg addresses this as it only has to write a single file to commit a job; no renames. That MAPREDUCE-7341 committer is ready for beta testing, if people can it'd be great. This weeks work was some obscure load failures modes of abfs that only surface under heavy load -not even TCP DS benchmarks. PS: do I know you? – stevel Nov 13 '21 at 13:30
  • 1
    related feature in Hive on EMR and S3 https://stackoverflow.com/a/68744194/2700344 - direct writes to S3 – leftjoin Nov 13 '21 at 17:36
  • @stevel Thanks for the info about Iceberg, I updated my answer in that regard. I don't think we've met, no. Nice to meet you! – FurryMachine Nov 14 '21 at 09:06
  • nice. one more thing: GCS dir renames and deletes are nonatomic and O(file) using v2 is no worse than v1 there, and the option "mapreduce.fileoutputcommitter.task.cleanup.enabled" should be set to offload some of that tempt cleanup to the tasks. Azure has own issues with deep/wide tree deletion – stevel Nov 15 '21 at 10:12
  • @stevel Thanks for the info. in MAPREDUCE-7341 it is written "Google GCS lacks the atomic directory rename required for v1 correctness; v2 can be used (which doesn't have the job commit performance limitations), but it's not safe." The last words make it rather unclear whether users should use v2 or not. I was assuming they shouldn't. As for GCS, according to Spark's documentation (https://spark.apache.org/docs/3.1.1/cloud-integration.html#configuring), GCS dir renames are both safe and O(1), so I'm a little confused... – FurryMachine Nov 16 '21 at 17:53
  • looks like a mistake by whoever wrote that doc. which, according to git, was me. it's O(files) and nonatomic. From an email from google devs on nov 2019 " directory rename is not atomic on GCS". given that, you may as well use v2 – stevel Nov 18 '21 at 14:00