26

I am partitioning a DataFrame as follows:

df.write.partitionBy("type", "category").parquet(config.outpath)

The code gives the expected results (i.e. data partitioned by type & category). However, the "type" and "category" columns are removed from the data / schema. Is there a way to prevent this behaviour?

Michael
  • 531
  • 1
  • 6
  • 11
  • Isn't that a point? All required data is still encoded in the directory structure so there is no data loss. If you want a some-values-per-file you could try `df.repartition("type", "category").write(...)` but you won't get nice structure. – zero323 Mar 22 '16 at 21:23
  • @zero323: yes, I agree there is no data loss. However, recovering the columns used for partitioning is non-trivial for some use cases. For example, if I want to load the data in pig, how would I recover the type and category columns? – Michael Mar 24 '16 at 01:21
  • Haven't used Pig in a while. Doesn't `ParquetLoader` understand the structure out of the box? – zero323 Mar 24 '16 at 01:35
  • @zero323: super long delay to your question... No, pig doesn't incorporate the directory structure with the parquet schema. – Michael Dec 06 '16 at 03:47

3 Answers3

22

I can think of one workaround, which is rather lame, but works.

import spark.implicits._

val duplicated = df.withColumn("_type", $"type").withColumn("_category", $"category")
duplicated.write.partitionBy("_type", "_category").parquet(config.outpath)

I'm answering this question in hopes that someone would have a better answer or explanation than what I have (if OP has found a better solution), though, since I have the same question.

Ivan Gozali
  • 2,089
  • 1
  • 27
  • 25
  • 4
    Actually doesn't look that lame to me. Seems like the best approach given the behaviour of `partitionBy()`. – Michael Dec 06 '16 at 03:51
9

I'd like to add a bit more context here and provide PySpark code instead of Scala for those who need it. You need to be careful how you read in the partitioned dataframe if you want to keep the partitioned variables (the details matter). Let's start by writting a partitioned dataframe like this:

df.write.mode("overwrite").partitionBy("Season").parquet("partitioned_parquet/")

To read the whole dataframe back in WITH the partitioning variables included...

path = "partitioned_parquet/"
parquet = spark.read.parquet(path)
parquet.show()

Result:

+-----+------+
|Value|Season|
+-----+------+
|   71|  2010|
|   77|  2010|
|   83|  2010|
|   54|  2010|
|  100|  2010|
+-----+------+
only showing top 5 rows

Note that if you include an * at end of your path name, the partitioning variables will be dropped.

path = "partitioned_parquet/*"
parquet = spark.read.parquet(path)
parquet.show(5)

Result:

+-----+
|Value|
+-----+
|   71|
|   77|
|   83|
|   54|
|  100|
+-----+
only showing top 5 rows

Now, if you want to read in only portions of the partitioned dataframe, you need to use this method in order to keep your partitioning variables (in this case "Season").

path = "partitioned_parquet/"
dataframe = spark.read.option("basePath", path).parquet(path+'Season=2010/',\
                                                                path+'Season=2011/', \
                                                                path+'Season=2012/')
dataframe.show(5)

Result:

+-----+------+
|Value|Season|
+-----+------+
|   71|  2010|
|   77|  2010|
|   83|  2010|
|   54|  2010|
|  100|  2010|
+-----+------+
only showing top 5 rows

Hope that helps folks!

Statmonger
  • 415
  • 6
  • 14
8

In general, Ivan's answer is a fine cludge. BUT...

If you are strictly reading and writing in spark, you can just use the basePath option when reading your data.

https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#partition-discovery

By passing path/to/table to either SparkSession.read.parquet or SparkSession.read.load, Spark SQL will automatically extract the partitioning information from the paths.

Example:

     val dataset = spark
      .read
      .format("parquet")
      .option("basePath", hdfsInputBasePath)
      .load(hdfsInputPath)
Robert Beatty
  • 508
  • 5
  • 11