I want to optimize the run-time of a Spark application by subdividing a huge csv file into different partitions, dependent of their characteristics.
E.g. I have a column with customer ids (integer, a), a column with dates (month+year, e.g. 01.2015, b), and a column with product ids (integer, c) (and more columns with product specific data, not needed for the partitioning).
I want to build a folder structure like /customer/a/date/b/product/c
. When a user wants to know information about products from customer X, sold in January 2016, he could load and analyse the file saved in /customer/X/date/01.2016/*
.
Is there a possibility to load such folder structures via wildcards? It should also be possible to load all customer or products of an specific time range, e.g. 01.2015 till 09.2015. Is it possible to use wildcards like /customer/*/date/*.2015/product/c
? Or how could a problem like this be solved?
I want to partition the data once, and later load the specific files in the analysis, to reduce the run-time for these jobs (disregarded the additional work for the partitioning).
SOLUTION: Working with Parquet files
I changed my Spark Application to save my data to Parquet files, now everything works fine and I can pre-select the data by giving folder-structure. Here my code snippet:
JavaRDD<Article> goodRdd = ...
SQLContext sqlContext = new SQLContext(sc);
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("keyStore", DataTypes.IntegerType, false));
fields.add(DataTypes.createStructField("textArticle", DataTypes.StringType, false));
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = goodRdd.map(new Function<Article, Row>() {
public Row call(Article article) throws Exception {
return RowFactory.create(article.getKeyStore(), article.getTextArticle());
}
});
DataFrame storeDataFrame = sqlContext.createDataFrame(rowRDD, schema);
// WRITE PARQUET FILES
storeDataFrame.write().partitionBy(fields.get(0).name()).parquet("hdfs://hdfs-master:8020/user/test/parquet/");
// READ PARQUET FILES
DataFrame read = sqlContext.read().option("basePath", "hdfs://hdfs-master:8020/user/test/parquet/").parquet("hdfs://hdfs-master:8020/user/test/parquet/keyStore=1/");
System.out.println("READ : " + read.count());
IMPORTANT
Don't try out with a table with only one column! You will get Exceptions when you try to call the partitionBy
method!