29

Description

I have an application, which sends data to AWS Kinesis Firehose and this writes the data into my S3 bucket. Firehose uses "yyyy/MM/dd/HH" format to write the files.

Like in this sample S3 path:

s3://mybucket/2016/07/29/12

Now I have a Spark application written in Scala, where I need to read data from a specific time period. I have start and end dates. The data is in JSON format and that's why I use sqlContext.read.json() not sc.textFile().

How can I read the data quickly and efficiently?

What have I tried?

  1. Wildcards - I can select the data from all hours of a specific date or all dates of a specific month, for example:

    val df = sqlContext.read.json("s3://mybucket/2016/07/29/*")
    val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")
    

    But if I have to read data from the date period of a few days, for example 2016-07-29 - 2016-07-30 I cannot use the wildcard approach in the same way.

    Which brings me to my next point...

  2. Using multiple paths or a CSV of directories as presented by samthebest in this solution. It seems that separating directories with commas only works with sc.textFile() and not sqlContext.read.json().
  3. Union - A second solution from the previous link by cloud suggests to read each directory separately and then union them together. Although he suggests unioning RDD-s, there's an option to union DataFrames as well. If I generate the date strings from given date period manually, then I may create a path that does not exist and instead of ignoring it, the whole reading fails. Instead I could use AWS SDK and use the function listObjects from AmazonS3Client to get all the keys like in iMKanchwala's solution from the previous link.

    The only problem is that my data is constantly changing. If read.json() function gets all the data as a single parameter, it reads all the necessary data and is smart enough to infer the json schema from the data. If I read 2 directories separately and their schemas don't match, then I think unioning these two dataframes becomes a problem.

  4. Glob(?) syntax - This solution by nhahtdh is a little better than options 1 and 2 because they provide the option to specify dates and directories in more detail and as a single "path" so it works also with read.json().

    But again, a familiar problem occurs about the missing directories. Let's say I want all the data from 20.07 to 30.07, I can declare it like this:

    val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")
    

    But if I am missing data from let's say 25th of July, then the path ..16/07/25/ does not exist and the whole function fails.

And obviously it gets more difficult when the requested period is for example 25.11.2015-12.02.2016, then I would need to programmatically (in my Scala script) create a string path something like this:

"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"

And by creating it, I would neet to somehow be sure that these 25-30 and 01-12 intervals all have corresponding paths, if one is missing, it fails again. (Asterisk fortunately deals with missing directories, as it reads everything that exists)

How can I read all the necessary data from a single directory path all at once without the possibility of failing because of a missing directory between some date interval?

Community
  • 1
  • 1
V. Samma
  • 2,558
  • 8
  • 30
  • 34
  • can you live with your 4th solution and checking start-end folders if they are exist before adding to this s3:// string ? I also use kinesis firehose and check folder exists before submitting to textFile. – halil Jul 29 '16 at 16:00
  • 1
    @halil But what if I am somehow missing the data from in between? As the data grows, it may be less probable, but at the moment I have such cases. And when the time period is large, for example 11 months, then checking the existance and building the glob string becomes quite difficult. – V. Samma Aug 01 '16 at 14:39
  • Yes i am dealing with that now too. When i got a month to query, checking ever hour's folder existence takes a little long. But it is better than job failed with exception now. Complete solution is extending FileInputFormat and skipping exception. But i dont know how to make spark use my extended class. – halil Aug 03 '16 at 06:32
  • Nice problem statement. – Alexandre Dupriez Aug 08 '17 at 08:53

1 Answers1

14

There is a much simpler solution. If you look at the DataFrameReader API you'll notice that there is a .json(paths: String*) method. Just build a collection of the paths you want, with globs of not, as you prefer, and then call the method, e.g.,

val paths: Seq[String] = ...
val df = sqlContext.read.json(paths: _*)
Sim
  • 13,147
  • 9
  • 66
  • 95
  • 1
    Sim thank you, I probably missed it because I am new to Scala and didn't realize `String*` may refer to a list or a Sequence. This is actually probably the correct and most efficient solution and answer to my question. But would you know how to parallelize this JSON reading process? I am dealing with the Spark's small files problem and found an [article](http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219#replies) which suggested a parallelized reading of data, but after getting the RDD, giving it to `read.json` as a param fails with "java.lang.NoClassDefFoundError" – V. Samma Aug 01 '16 at 14:51
  • What parallelization are you referring to? Spark will automatically distribute the reading of the JSON files to executors in the cluster. The command above should only read one file in order to generate a schema from the JSON. If you don't want that, you have to provide a schema yourself using `.read.schema()...`. – Sim Aug 01 '16 at 19:23
  • 2
    Only one file? If I need to read months worth of data, it is almost certain that the oldest samples have a slightly different schema than the newest samples. I cannot provide the schema manually, as I have a JSON struct with more than 200 attributes and they are constantly changing as we develop our system. But I meant the `sc.parallelize` like suggested in the mentioned article. `read.json` on small files is super slow. With 1+3 `c3.2xlarge` AWS EMR instances it took over 5 minutes to read 4GB of data, but 10s for 3GB single file. How can I ever deal with actual big data if it's so slow? – V. Samma Aug 02 '16 at 06:35
  • If your schema is variable, use `sqlContext.read.option("mergeschema", "true").json(...)`. This is slow but safe: it will read all the data and auto-generate a merged schema based on all the JSON documents it sees. You can speed things up by manually discovering the schema only when the schema changes, saving it and then providing it explicitly. I would only recommend this approach to advanced Spark users as it adds a lot of complexity. – Sim Aug 02 '16 at 16:09
  • Basically, if your JSON schema is variable, you only want to read JSON once + then persist to Parquet (with or without processing). Subsequent reads from Parquet, even with a variable schema are going to be much, much faster because of smaller file size and explicit schema management. – Sim Aug 02 '16 at 16:14
  • Sorry for the late reply. I didn't find any info about the possible values for `option` function, it seems to be undocumented. But I am quite sure it is merging the schema by default as when I read data in at the moment without the option function, I get all the data and columns I need. As multiple other developers may change the schema, it would be irrational to manually keep it synchronized. Even if it is somehow possible to be done in Spark. I would need to look into the Parquet option, I haven't dealt with it at all at the moment. If you have any pointers, I would appreciate your help. – V. Samma Aug 10 '16 at 07:37
  • 1
    Sorry, I don't know of any documentation dealing with this. – Sim Aug 12 '16 at 00:33