0

Given the following snippet (Spark version: 1.5.2):

rdd.toDF().write.mode(SaveMode.Append).parquet(pathToStorage)

which saves RDD data to flattened Parquet files, I would like my storage to have a structure like:

country/
    year/
        yearmonth/
            yearmonthday/

The data itself contains a country column and a timestamp one, so I started with this method. However, since I only have a timestamp in my data, I can't partition the whole thing by year/yearmonth/yearmonthday as those are not columns per se...

And this solution seemed pretty nice, except I can't get to adapt it to Parquet files...

Any idea?

Community
  • 1
  • 1
SinDeus
  • 516
  • 1
  • 6
  • 21

1 Answers1

1

I figured it out. In order for the path to be dynamically linked to the RDD, one first has to create a tuple from the rdd:

rdd.map(model => (model.country, model))

Then, the records will all have to be parsed, to retrieve the distinct countries:

val countries = rdd.map {
        case (country, model) => country
    }
    .distinct()
    .collect()

Now that the countries are known, the records can be written according to their distinct country:

countries.map {
    country => {
        val countryRDD = rdd.filter {
                case (c, model) => c == country
            }
            .map(_._2)
        countryRDD.toDF().write.parquet(pathToStorage + "/" + country)
    }
} 

Of course, the whole collection has to be parsed twice, but it is the only solution I found so far.

Regarding the timestamp, you will just have to do the same process with a 3-tuple (the third being something like 20160214); I went with the current timestamp finally.

SinDeus
  • 516
  • 1
  • 6
  • 21