13

My dataset is partitioned in this way:

Year=yyyy
 |---Month=mm
 |   |---Day=dd
 |   |   |---<parquet-files>

What is the easiest and efficient way to create a dataframe in spark loaded with data between two dates?

r4ravi2008
  • 445
  • 1
  • 4
  • 10
  • If you want easy range queries on partitions, the best solution is to use a better partitioning strategy where time on a single axis, e.g., `/tbl/ts=yyyymmddhhmm/*.parquet`. There is a section on this topic in https://spark-summit.org/east-2017/events/bulletproof-jobs-patterns-for-large-scale-spark-processing/ – Sim Nov 11 '17 at 09:33

2 Answers2

13

If you absolutely have to stick to this partitioning strategy, the answer depends on whether you are willing to bear partition discovery costs or not.

If you are willing to have Spark discover all partitions, which only needs to happen once (until you add new files), you can load the basepath and then filter using the partition columns.

If you do not want Spark to discover all the partitions, e.g., because you have millions of files, the only efficient general solution is to break the interval you want to query for into several sub-intervals you can easily query for using @r0bb23's approach and then union then together.

If you want the best of both cases above and you have a stable schema, you can register the partitions in the metastore by defining an external partitioned table. Don't do this if you expect your schema to evolve as metastore-managed tables manage schema evolution quite poorly at this time.

For example, to query between 2017-10-06 and 2017-11-03 you'd do:

// With full discovery
spark.read.parquet("hdfs:///basepath")
  .where('Year === 2017 && (
    ('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
  ))

// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)

Writing generic code for this is certainly possible but I haven't encountered it. The better approach is to partition in the manner outlined in the comment I made to the question. If your table was partitioned using something like /basepath/ts=yyyymmddhhmm/*.parquet then the answer is simply:

spark.read.parquet("hdfs:///basepath")
  .where('ts >= 201710060000L && 'ts <= 201711030000L)

The reason why it's worth adding hours & minutes is that you can then write generic code that handles intervals regardless of whether you have the data partitioned by week, day, hour, or every 15 mins. In fact you can even manage data with different granularity in the same table, e.g., older data is aggregated at higher levels to reduce the total number of partitions that need to be discovered.

Sim
  • 13,147
  • 9
  • 66
  • 95
  • 1
    Since spark creates a folder for each partition when saving to parquet format: Wouldn't your last generic proposal create a massive amount of folders (for each minute) and couldn't be this an issue (io/ressource-wise) for the operating system? (on unix based system, I think some tuning via ulimit is needed at some time). Thanks for your great answer btw. – Aydin K. Jun 28 '18 at 11:26
  • 1
    @AydinK. two thoughts about your question. First, having the ability to resolve down to minute granularity doesn't mean it makes sense to partition to a single minute granularity. The smallest I've heard of in production is 15mins, i.e., `00`, `15`, `30` and `45` for the last two digits. Second, it is unusual to use standard file systems for big data. Most production environments use something like HDFS or a cloud-based object store such as AWS S3, which can handle very large numbers of objects. – Sim Jul 01 '18 at 06:19
5

Edited to add multiple load paths to address comment.

You can use a regex style syntax.

val dataset = spark
  .read
  .format("parquet")
  .option("filterPushdown", "true")
  .option("basePath", "hdfs:///basepath/")
  .load("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9],[1-3][0-9]}/*/",
    "hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")

How to use regex to include/exclude some input files in sc.textFile?

Note: you don't need the X=* you can just do * if you want all days, months, etc.

You should probably also do some reading about Predicate Pushdown (ie filterPushdown set to true above).

Finally, you will notice the basepath option above, the reason for that can be found here: Prevent DataFrame.partitionBy() from removing partitioned columns from schema

Robert Beatty
  • 508
  • 5
  • 11
  • This is not a general solution to the problem. In fact, there is no simple general solution to querying for an interval of dates using this partitioning strategy. For example, how would you use this approach to query between `2017-10-06` and `2017-11-03`? – Sim Nov 11 '17 at 09:37
  • Some good info in your answer below. But you don't need the union shown in your answer (see edit above). So I do have to say I think it is way more generalizable than you give it credit, though, it will require some not so pretty helper functions. But For a lot if not a majority of systems it is worth it. Because, as you acknowledge, partition discovery is not cheap at scale. Partial discovery is just better at scale. Though i do agree, a better partitioning strategy would help out. I use something a lot more like what you have below, making the helper functions and the above code trivial. – Robert Beatty Nov 13 '17 at 18:54