9

I am struggling with step where I want to write each RDD partition to separate parquet file with its own directory. Example will be:

    <root>
        <entity=entity1>
            <year=2015>
                <week=45>
                    data_file.parquet

Advantage of this format is I can use this directly in SparkSQL as columns and I will not have to repeat this data in actual file. This would be good way to get to get to specific partition without storing separate partitioning metadata someplace else.

​As a preceding step I have all the data loaded from large number of gzip files and partitioned based on the above key.

Possible way would be to get each partition as separate RDD and then write it though I couldn't find any good way of doing it.

Any help will be appreciated. By the way I am new to this stack.

BAR
  • 15,909
  • 27
  • 97
  • 185
Rajeev Prasad
  • 153
  • 1
  • 8

2 Answers2

50

I don't think the accepted answer appropriately answers the question.

Try something like this:

df.write.partitionBy("year", "month", "day").parquet("/path/to/output")

And you will get the partitioned directory structure.

BAR
  • 15,909
  • 27
  • 97
  • 185
  • Yes, this answer is simpler and future-proof. – Sim Dec 20 '15 at 08:47
  • Even *this* approach ends up with single threaded /serial writing to the parquet instead of in parallel -per-partition. At least that's the result in `standalone` mode: I am still testing to see if that happens in a yarn cluster. – WestCoastProjects Jun 27 '18 at 06:16
2

I think it's possible by calling foreachPartition(f: Iterator[T] => Unit) on the RDD you want to save.

In the function you provided into foreachPartition:

  1. Prepare the path hdfs://localhost:9000/parquet_data/year=x/week=y
  2. a ParquetWriter
  3. exhaust the Iterator by inserting each line into the recordWriter.
  4. clean up
yjshen
  • 6,583
  • 3
  • 31
  • 40
  • Thanks for answer. I conceptually understand your answer and tried to implement it but could not find way to construct ParquetRecord writer? Any code sample in Scala will be extremely helpful. – Rajeev Prasad May 21 '15 at 04:58
  • @RajeevPrasad, I edited the answer with a parquet writer example, please check the related src code of spark to see how it works :) – yjshen May 21 '15 at 05:22
  • Thanks for you help. This works fine. Do you know what would be performance of this implementation compared to writing directly using saveAsParquetFile. – Rajeev Prasad May 21 '15 at 07:09
  • By the way I did some small perf runs and it seems very comparable (actually) little better. My guess it saveAsParquet must be implementing it in the same way under the covers. – Rajeev Prasad May 21 '15 at 22:39