1

I am working with Dataproc and Parquet on Google Cloud Platform, with data on GCS, and writing lots of small to moderately sized files is a major hassle, being a couple times slower than what I would get with less bigger files or HDFS.

The Hadoop community has been working on S3Guard, which uses DynamoDB for S3A. Similarly, s3committer uses S3's multi-part API to provide a simple alternative committer that is much more efficient.

I am looking for similar solutions on GCS. The multi-part API from S3 is one of the few things not offered by GCS's XML API and thus cannot be used as is. Instead, GCS has a "combine" API where you upload files separately and then issue a combine query. This seems like it could be used to adapt the multi-part upload from s3committer but I am not quite sure.

I could not find any information about using S3Guard on GCS with an alternate key value store (and the S3A connector -- not even sure it can be used with the GCS XML API).

0-rename commits seem to be a common issue with Hadoop and Apache Spark. What are usual solutions to that on GCS, besides "writing less, bigger files"?

pay
  • 133
  • 2
  • 8
  • What's the rough magnitude of number of files you're writing, and approximately how much total data is it spread across those files? Are you using Spark or Hive? Are you writing into partitions? – Dennis Huo Aug 09 '17 at 01:01

2 Answers2

3

There are a few different things in play here. For the problem of enforcing list consistency, Dataproc traditionally relied on a per-cluster NFS mount to apply client-enforced list-after-write consistency; more recently, Google Cloud Storage has managed to improve its list-after-write consistency semantics and now list operations are strongly consistency immediately after all writes. Dataproc is phasing out client-enforced consistency, and something like S3Guard on DynamoDB is no longer needed for GCS.

As for multipart upload, in theory it could be possible to use GCS Compose as you mention, but in most cases the parallel multipart uploads for single large files is mostly helpful in a single-stream situation, whereas most Hadoop/Spark workloads will already be parallelizing different tasks per machine such that it's not beneficial to multithread each individual upload stream; aggregate throughput will be about the same with or without parallel multipart uploads.

So that leaves the question of using the multi-part API to perform conditional/atomic commits. The GCS connector for Hadoop does currently use something called "resumable uploads" where it's theoretically possible for a node to be responsible for "committing" an object that has been uploaded by a completely different node; the client libraries just aren't currently structured to make this very straightforward. However, at the same time, the "copy-and-delete" phase of a GCS "rename" is also different from S3 in that it is done as metadata operations instead of a true data "copy". This makes GCS amenable to using vanilla Hadoop FileCommitters instead of needing to commit "directly" into the final location and skipping the "_temporary" machinery. It may not be ideal to have to "copy/delete" metadata of each file instead of a true directory rename, but it also isn't proportional to the underlying data size, only proportional to the number of files.

Of course, all this still doesn't solve the fact that committing lots of small files is inefficient. It does, however, make it likely that the "direct commit" aspect isn't as much of a factor as you might think; more often the bigger issue is something like Hive not parallelizing file commits at completion time, especially when committing to lots of partition directories. Spark is much better at this, and Hive should be improving over time.

There is a recent performance improvement using a native SSL library in Dataproc 1.2 which you can try without having to "write less, bigger files", just by using Dataproc 1.2 out of the box.

Otherwise, real solutions really do involve writing fewer, bigger files, since even if you fix the write side, you'll suffer on the read side if you have too many small files. GCS is heavily optimized for throughput, so anything less than around 64MB or 128MB may be spending more time just on overhead of spinning up a task and opening the stream vs actual computation (should be able to read that much data in maybe 200ms-500ms or so).

In that vein, you'd want to make sure you set things like hive.merge.mapfiles, hive.merge.mapredfiles, or hive.merge.tezfiles if you're using those, or repartition your Spark dataframes before saving to GCS; merging into larger partitions is usually well worth it for keeping your files manageable and profiting from ongoing faster reads.

Edit: One thing I forgot to mention is that I've been loosely using the term repartition, but in this case since we're strictly trying to bunch up the files into larger files, you may do better with coalesce instead; there's more discussion in another StackOverflow question about repartition vs coalese.

Dennis Huo
  • 10,517
  • 27
  • 43
  • Does GCS pool HTTP/1.1 threads? This works better for small files, though latency does hurt. Also, what are the timings of the various HEAD/GET/LIST calls? These are the things we need to know to estimate cost of various Hadoop API calls – stevel Aug 10 '17 at 13:13
  • Great answer, thanks, I did not know about the consistency improvements or the fact rename is a metadata operation. – pay Aug 10 '17 at 22:15
  • I am using Spark on Dataproc 1.2, and it is not uncommon that I write lots of 500KB files on some datasets, but it seems I already benefit from most of the recent improvements you mention. Something I have been thinking about is: writing to HDFS or local filesystem (fast), and then uploading all these small files in a couple requests to GCS using [batching](https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch). I assume the connector does not use any batching currently. Do you expect this could have a significant impact? – pay Aug 10 '17 at 22:26
  • @steve-loughran Kind of; here are threadpools per GoogleCloudStorageImpl, where that class is instantiated once per "gs" FileSystem instance, so usually it's a singleton. But the bigger difference here is made in the choice to use "ResumableUpload" instead of "DirectUpload"; since the connector doesn't get upfront info on file size, it assumes it'll be large and thus opens a "resumable" session which involves several heavyweight round trips on the backend to create a landing zone. The difference between resumable and direct will likely make a bigger difference than thread pooling at this point – Dennis Huo Aug 11 '17 at 00:32
  • @SteveLoughran There's some rudimentary plumbing for [setting DirectUpload](https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java#L61) because we use the underlying library elsewhere where this is important, but it's not currently plumbed through for easy usage in the Hadoop layer. As for cost of round-trips, HEAD/GET should be median of around 50ms, but this varies depending on location proximity, regional buckets, etc. For estimation purposes better to go with a conservative 100ms. – Dennis Huo Aug 11 '17 at 00:35
  • On top of that, there's a lot of extra round trips when actually opening a file for read or when creating a file to enforce byte ranges, directory semantics, etc. So opening for reading is more like 300-500ms. There are a couple options to turn off certain semantics which break HCFS contracts, but improve performance (for example, [disabling the need to fast-fail on a not-found file](https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/CHANGES.txt#L153), removing API calls before any bytes are read). – Dennis Huo Aug 11 '17 at 00:38
  • @pay There's batching of lots of metadata-only operations under the hood already, if you look for `batchHelper.queue` in [GoogleCloudStorageImpl](https://github.com/GoogleCloudPlatform/bigdata-interop/blob/cecf6c60c45f26d40cc9b87279ee16cf0cc40c15/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl.java) - however, you're right that there's no batching of uploads. Resumable uploads definitely can't be batched. I'm not sure whether direct uploads can be batched if you do reach into the underlying libraries to enable direct uploads. – Dennis Huo Aug 11 '17 at 00:44
  • I believe the note on the [page you linked](https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch) indicates that batching will not work: "Note: Currently, Google Cloud Storage does not support batch operations for media, either for upload or download." – Dennis Huo Aug 11 '17 at 00:46
  • @pay - how many files are you talking about when you say "lots"? Is it 10,000, 100,000, or 1,000,000 or more? How many cores/workers does your cluster have? If it's some dataframe-specific slowness, you might still get better performance by first writing to HDFS on the cluster and then running `hadoop distcp` into GCS, which helps ensure good balance of workers contributing to the copying from HDFS into GCS even if your Spark partitioning was bad. Otherwise, I strongly recommend a `repartition` call in Spark to have larger files. – Dennis Huo Aug 11 '17 at 00:49
  • @SteveLoughran Forgot to link to the code where threadpools are instantiated, [here it is](https://github.com/GoogleCloudPlatform/bigdata-interop/blob/cecf6c60c45f26d40cc9b87279ee16cf0cc40c15/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl.java#L149) – Dennis Huo Aug 11 '17 at 00:51
  • @pay FYI added a minor update to my answer; in your case you probably want `coalesce` if possible rather than actual `repartition`. – Dennis Huo Aug 11 '17 at 02:04
  • @DennisHuo I am already controlling the partition number, but I am using `write.partitionBy(someTimeIntervalColumn).save(...)` to produce one file per 5 or 10min time interval in my data. Usually, each interval will have a fair amount of data, but for some time periods or datasets that is not the case, and the resulting jobs are pretty slow to process what is, in the end, a tiny amount of data. – pay Aug 11 '17 at 12:44
  • Cluster is pretty small (6x `n1-standard-8`, regional GCS bucket in same region as the cluster), but I am happy with the performance writing to local HDFS (I didn't do precise benchmarks, but overall between x2 to x5 faster) and I was hoping to get as close as possible to that with GCS. – pay Aug 11 '17 at 12:46
  • Good point on upload batching -- I use it regularly for bucket to bucket copies and I had not realized it did not apply to uploads. – pay Aug 11 '17 at 12:47
  • yes, there's a lot of needless FS checks in the hadoop APIs (parent path validity, etc), which aren't needed when the sequence of FS operations for a task/job committer are known. The S3Guard committer does them at setup and then doesn't repeat them for the individual file operations. IBM's stocator tool mocks them at the FS level when it recognises paths under _temporary – stevel Aug 15 '17 at 09:30
0

S3Guard, HADOOP-13345 retrofits consistency to S3 by having DynamoDB store the listings. This makes it possible, for the first time, to reliably use S3A as a direct destination of work. Without that, execution time time may seem the problem, but the real one is the rename-based committer may get an inconsistent listing and not even see what files it has to rename.

The S3Guard Committer work HADOOP-13786 will, when finished (as of Aug 2017, still a work in progress), provides two committers.

Staging committer

  1. workers write to local filesystem
  2. Task committer uploads to S3 but does not complete the operation. Instead it saves commit metainfo to HDFS.
  3. This commit metainfo is committed as normal task/job data in HDFS.
  4. In Job commit, the committer reads the data of pending commits from HDFS and completes them, then does cleanup of any outstanding commits.

Task commit is an upload of all data, time is O(data/bandwidth).

This is based on Ryan's s3committer at Netflix and is the one which is going to be safest to play with at first.

Magic committer

Called because it does "magic" inside the filesystem.

  1. the Filesystem itself recognises paths like s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy
  2. redirects the write to s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy ; doesn't complete the write in the stream close() call.
  3. saves the commit metainfo to s3a, here s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy.pending
  4. Task commit: loads all .pending files from that dir, aggregates, saves elsewhere. Time is O(files); data size unimportant.

  5. Task abort: load all .pending files, abort the commits

  6. Job commit: load all pending files from committed tasks, completes.

Because it is listing files in S3, it will need S3Guard to deliver consistency on AWS S3 (other S3 implementations are consistent out the box, so don't need it).

Both committers share the same codebase, job commit for both will be O(files/threads), as they are all short POST requests which don't take up bandwidth or much time.

In tests, the staging committer is faster than the magic one for small test-scale files, because the magic committer talks more to S3, which is slow...though S3Guard speeds listing/getFileStatus calls up. The more data you write, the longer task commits on the staging committer take, whereas task commit for the magic one is constant for the same number of files. Both are faster than using rename(), due to how it is mimicked by list, copy

GCS and Hadoop/Spark Commit algorithms

(I haven't looked at the GCS code here, so reserve the right to be wrong. Tread Dennis Huo's statements as authoritative)

If GCS does rename() more efficiently than the S3A copy-then-delete, it should be faster, more O(file) than O(data), depending on parallelisation in the code.

I don't know if they can go for a 0-rename committer. The changes in the mapreduce code under FileOutputFormat are designed to support different/pluggable committers for different filesystems, so they have the opportunity to do something here.

For now, make sure you are using the v2 MR commit algorithm, which, while less resilient to failures, does at least push the renames into task commit, rather than job commit.

See also Spark and Object Stores.

stevel
  • 12,567
  • 1
  • 39
  • 50