9

I want to optimize the run-time of a Spark application by subdividing a huge csv file into different partitions, dependent of their characteristics.

E.g. I have a column with customer ids (integer, a), a column with dates (month+year, e.g. 01.2015, b), and a column with product ids (integer, c) (and more columns with product specific data, not needed for the partitioning).

I want to build a folder structure like /customer/a/date/b/product/c. When a user wants to know information about products from customer X, sold in January 2016, he could load and analyse the file saved in /customer/X/date/01.2016/*.

Is there a possibility to load such folder structures via wildcards? It should also be possible to load all customer or products of an specific time range, e.g. 01.2015 till 09.2015. Is it possible to use wildcards like /customer/*/date/*.2015/product/c? Or how could a problem like this be solved?

I want to partition the data once, and later load the specific files in the analysis, to reduce the run-time for these jobs (disregarded the additional work for the partitioning).

SOLUTION: Working with Parquet files

I changed my Spark Application to save my data to Parquet files, now everything works fine and I can pre-select the data by giving folder-structure. Here my code snippet:

JavaRDD<Article> goodRdd = ...

SQLContext sqlContext = new SQLContext(sc);

List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("keyStore", DataTypes.IntegerType, false));
fields.add(DataTypes.createStructField("textArticle", DataTypes.StringType, false));

StructType schema = DataTypes.createStructType(fields);

JavaRDD<Row> rowRDD = goodRdd.map(new Function<Article, Row>() {
    public Row call(Article article) throws Exception {
        return RowFactory.create(article.getKeyStore(), article.getTextArticle());
    }
});

DataFrame storeDataFrame = sqlContext.createDataFrame(rowRDD, schema);

// WRITE PARQUET FILES
 storeDataFrame.write().partitionBy(fields.get(0).name()).parquet("hdfs://hdfs-master:8020/user/test/parquet/");

// READ PARQUET FILES
DataFrame read = sqlContext.read().option("basePath", "hdfs://hdfs-master:8020/user/test/parquet/").parquet("hdfs://hdfs-master:8020/user/test/parquet/keyStore=1/");

System.out.println("READ : " + read.count());

IMPORTANT

Don't try out with a table with only one column! You will get Exceptions when you try to call the partitionBy method!

D. Müller
  • 3,336
  • 4
  • 36
  • 84

1 Answers1

36

So, in Spark you can save and read partitioned data much in the way you are looking for. However, rather than creating the path like you have /customer/a/date/b/product/c Spark will use this convention /customer=a/date=b/product=c when you save data using:

df.write.partitionBy("customer", "date", "product").parquet("/my/base/path/")

When you need to read in the data, you need to specify the basepath-option like this:

sqlContext.read.option("basePath", "/my/base/path/").parquet("/my/base/path/customer=*/date=*.2015/product=*/")

and everything following /my/base/path/ will be interpreted as columns by Spark. In the example given here, Spark would add the three columns customer, date and product to the dataframe. Note that you can use wildcards for any of the columns as you like.

As for reading in data in a specific time range, you should be aware that Spark uses predicate push down, so it will only actually load data into memory that fits the criteria (as specified by some filter-transformation). But if you really want to specify range explicitly, you could generate a list of path names and then pass that to the read function. Like this:

val pathsInMyRange = List("/my/path/customer=*/date=01.2015/product=*", 
                          "/my/path/customer=*/date=02.2015/product=*", 
                          "/my/path/customer=*/date=03.2015/product=*"...,
                          "/my/path/customer=*/date=09.2015/product=*")

sqlContext.read.option("basePath", "/my/base/path/").parquet(pathsInMyRange:_*)

Anyway, I hope this helps :)

Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
  • thank you! Looks great, just tried it out - it works without the paritioning... when I use "df.write.partitionBy" I get an Exception, see the edited code above. – D. Müller Jul 05 '16 at 08:10
  • It's working now! Thank you for your answer, @glennie-helles-sindholt! The Exception occured because I tried to partition a table with only one column (unrealistic test case), so here you need at least two columns to get it work! – D. Müller Jul 05 '16 at 09:07
  • Great solution ! – vikrant rana Dec 01 '18 at 06:02
  • I have a job to make some compaction of small parquet files and the _spark_metadata folder was causing problems. With the basePath and some Wildcards I was able to solved it! – Playing With BI Sep 06 '19 at 14:55