16

I have many gzipped files stored on S3 which are organized by project and hour per day, the pattern of the paths of the files is as:

s3://<bucket>/project1/20141201/logtype1/logtype1.0000.gz
s3://<bucket>/project1/20141201/logtype1/logtype1.0100.gz
....
s3://<bucket>/project1/20141201/logtype1/logtype1.2300.gz

Since the data should be analyzed on a daily basis, I have to download and decompress the files belongs to a specific day, then assemble the content as a single RDD.

There should be several ways can do this, but I would like to know the best practice for Spark.

Thanks in advance.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
shihpeng
  • 5,283
  • 6
  • 37
  • 63

3 Answers3

27

The underlying Hadoop API that Spark uses to access S3 allows you specify input files using a glob expression.

From the Spark docs:

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

So in your case you should be able to open all those files as a single RDD using something like this:

rdd = sc.textFile("s3://bucket/project1/20141201/logtype1/logtype1.*.gz")

Just for the record, you can also specify files using a comma-delimited list, and you can even mix that with the * and ? wildcards.

For example:

rdd = sc.textFile("s3://bucket/201412??/*/*.gz,s3://bucket/random-file.txt")

Briefly, what this does is:

  • The * matches all strings, so in this case all gz files in all folders under 201412?? will be loaded.
  • The ? matches a single character, so 201412?? will cover all days in December 2014 like 20141201, 20141202, and so forth.
  • The , lets you just load separate files at once into the same RDD, like the random-file.txt in this case.

Some notes about the appropriate URL scheme for S3 paths:

  • If you're running Spark on EMR, the correct URL scheme is s3://.
  • If you're running open-source Spark (i.e. no proprietary Amazon libraries) built on Hadoop 2.7 or newer, s3a:// is the way to go.
  • s3n:// has been deprecated on the open source side in favor of s3a://. You should only use s3n:// if you're running Spark on Hadoop 2.6 or older.
Nick Chammas
  • 11,843
  • 8
  • 56
  • 115
  • @NickChammas Do I have to worry about the size of the *.gz files? Is there any upper limit for the combined size of all the *.gz files? – santhosh Nov 12 '15 at 20:55
  • @santhosh - The upper limit just depends on the power of your cluster. The larger your gzip files are, the longer it will take Spark to decompress them before it can begin work. If you have any other questions please ask a new question. This answer is about glob expressions, not gzip. – Nick Chammas Nov 12 '15 at 21:41
  • @NickChammas Thanks for the reply Nick. I have created a new question, http://stackoverflow.com/questions/33682709/spark-to-process-many-tar-gz-files-from-s3 – santhosh Nov 12 '15 at 22:34
7

Note: Under Spark 1.2, the proper format would be as follows:

val rdd = sc.textFile("s3n://<bucket>/<foo>/bar.*.gz")

That's s3n://, not s3://

You'll also want to put your credentials in conf/spark-env.sh as AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

Joseph Lust
  • 19,340
  • 7
  • 85
  • 83
  • What will be the difference of using s3n vs s3? I've used s3 in 1.2.0 and it's still working. Any performance improvements? – Stephane Maarek Jan 27 '15 at 10:33
  • @Stephane Perhaps my driver setup is different, but with out of the box Spark, `s3://` didn't work, but `s3n://` (through HDFS) did. – Joseph Lust Jan 27 '15 at 17:29
  • 1
    If you're running Spark on EMR, `s3://` is the correct scheme to use. Otherwise, `s3n://` has been deprecated in favor of `s3a://`. I updated [my answer](https://stackoverflow.com/a/27494547/877069) accordingly. – Nick Chammas Jun 12 '18 at 21:32
1

Using AWS EMR with Spark 2.0.0 and SparkR in RStudio I've managed to read the gz compressed wikipedia stat files stored in S3 using the below command:

df <- read.text("s3://<bucket>/pagecounts-20110101-000000.gz")

Similarly, for all files under 'Jan 2011' you can use the above command like below:

df <- read.text("s3://<bucket>/pagecounts-201101??-*.gz")

See the SparkR API docs for more ways of doing it. https://spark.apache.org/docs/latest/api/R/read.text.html

Joarder Kamal
  • 1,387
  • 1
  • 20
  • 28