46

How to read partitioned parquet with condition as dataframe,

this works fine,

val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=25/*")

Partition is there for day=1 to day=30 is it possible to read something like(day = 5 to 6) or day=5,day=6,

val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=??/*")

If I put * it gives me all 30 days data and it too big.

WoodChopper
  • 4,265
  • 6
  • 31
  • 55

6 Answers6

117

sqlContext.read.parquet can take multiple paths as input. If you want just day=5 and day=6, you can simply add two paths like:

val dataframe = sqlContext
      .read.parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/", 
                    "file:///your/path/data=jDD/year=2015/month=10/day=6/")

If you have folders under day=X, like say country=XX, country will automatically be added as a column in the dataframe.

EDIT: As of Spark 1.6 one needs to provide a "basepath"-option in order for Spark to generate columns automatically. In Spark 1.6.x the above would have to be re-written like this to create a dataframe with the columns "data", "year", "month" and "day":

val dataframe = sqlContext
     .read
     .option("basePath", "file:///your/path/")
     .parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/", 
                    "file:///your/path/data=jDD/year=2015/month=10/day=6/")
Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
  • First thanks for response, I was looking for more simple way. In case some 20 days as subset this way will be kind of difficult. I would be filtering often to check the data accuracy. – WoodChopper Nov 11 '15 at 17:50
  • 3
    Then why not simply do `val dataframe = sqlContext.read.parquet("file:///your/path/data=jDD/year=2015/month=10/")? `day` is added as a column in the dataframe, which you can then filter on. – Glennie Helles Sindholt Nov 11 '15 at 18:18
  • Actually, it very huge data running. Data is from 2007 to 2015. On an average 5 billion row of raw logs are processed and stored. I would be asked for particular data report on demand – WoodChopper Nov 11 '15 at 18:21
  • 15
    Right, so the first thing you do is a `filter` operation. Since Spark does lazy evaluation you should have no problems with the size of the data set. The filter will be applied before any actions and only the data you are interested in will be kept in memory. – Glennie Helles Sindholt Nov 11 '15 at 19:05
  • 1
    Well it seems only answer is this! – WoodChopper Nov 26 '15 at 08:04
  • 1
    Have an upvote, Glennie! Your remark about using filter() on the read.parquet() was exactly what I needed. – Eric M May 13 '16 at 19:30
  • 1
    Glennie where did you read about the option on sqlcontext.read? I could not find anything in spark docs. – Vijay Krishna Jul 25 '17 at 21:50
  • 1
    It is written in the documentation. Look [here](https://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery). When they changed it from the old way to the new way, I remember that I stubbled across it in the release notes. – Glennie Helles Sindholt Jul 26 '17 at 07:39
  • @GlennieHellesSindholt: i'm trying to read the AVRO files using Spark avro reader , i gave the basePath option but still the partition columns are not available in the DataFrame. any idea? – Shankar Feb 14 '18 at 10:30
  • Is there a way to have wildcards in the `basePath` I seem to be getting error : `java.lang.IllegalArgumentException: Option 'basePath' must be a directory` – samthebest Dec 05 '19 at 17:01
  • @samthebest did you got any solution of adding wildcards in basepath? – ben Jul 30 '20 at 19:36
50

If you want to read for multiple days, for example day = 5 and day = 6 and want to mention the range in the path itself, wildcards can be used:

val dataframe = sqlContext
  .read
  .parquet("file:///your/path/data=jDD/year=2015/month=10/day={5,6}/*")

Wildcards can also be used to specify a range of days:

val dataframe = sqlContext
  .read
  .parquet("file:///your/path/data=jDD/year=2015/month=10/day=[5-10]/*")

This matches all days from 5 to 10.

  • 4
    Is this exclusively for scala? I'm trying it with pyspark, it works with `{}` notation but not `[]`. I'm trying to read in a range. – Auren Ferguson Jul 29 '19 at 13:10
  • Does this work for specifying range of years and months in the same fashion like "file:///your/path/data=mydata/year=[2015-2018]/month=[1-6]/day=[5-10]/*") – Vivek Sharma Dec 18 '19 at 11:36
  • It's so strange that the second method is not implemented in pyspark. It would be really handy to have it. – hui chen Dec 14 '21 at 09:46
7

you need to provide mergeSchema = true option. like mentioned below (this is from 1.6.0):

val dataframe = sqlContext.read.option("mergeSchema", "true").parquet("file:///your/path/data=jDD")

This will read all the parquet files into dataframe and also creates columns year, month and day in the dataframe data.

Ref: https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#schema-merging

Martin Tournoij
  • 26,737
  • 24
  • 105
  • 146
Kiran N
  • 197
  • 3
  • 3
1

In my case with pyspark:

sdf_table = spark.read.parquet("s3://bucket/table/**/*.parquet")

The ** is all partition of parquet (a glob expression )

note that read all files parquet in the bucket "table/" , so keep wwarning with other files

0

I was unable to use wildcards, like Cristian suggested. After some digging, I found the following that seems to work:

spark.read.option("recursiveFileLookup", "true").parquet("s3a://bucket/table/")

https://kontext.tech/article/887/read-parquet-files-from-nested-directories

J Weezy
  • 3,507
  • 3
  • 32
  • 88
0

The answers in this thread did not assist my similar need, and so I created [and subsequently answered] this question.

Include partition steps as columns when reading Synapse spark dataframe

Paul Wilson
  • 562
  • 5
  • 16