0

If I am understanding the documentation correctly, partitioning a dataframe, vs partitioning a hive or other on-disk table, seem to be different. For on-disk storage, partitioning by, say, date creates a set of partitions for each date which occurs in my dataset. This seems useful; if I query records for a given date, every node in my cluster processes only partitions corresponding to the date I want.

Dataframe.repartition, on the other hand, creates one partition for each date which occurs in my dataset. If I search for records from a specific date, they will all be found in a single partition and thus all processed by a single node.

Is this right? If so, what is the use case? What is the way to get the speed advantage of on-disk partitioning schemes in the context of a dataframe?

For what it's worth, I need the advantage after I do an aggregation of on-disk data, so the on-disk partitioning doesn't necessarily help me even with delayed execution.

Tobias Hagge
  • 231
  • 1
  • 8
  • 1
    Ter are partitions at two levels. Dataframe.repartition(...) and DataframeWriter.partitionBy(...) Which one are you talking of? – Balaji Reddy Aug 22 '19 at 07:58
  • 1
    He states partitionBy in the title, but good point. – thebluephantom Aug 22 '19 at 12:00
  • Ugh. I somehow got confused about the interface. What I speak about as "dataframe partitionby" is in fact dataframe.repartition. I'm going to delete the question (edit: change the title and text to accurately reflect that part of the question which is not a duplicate of the link below). Thanks for your help. https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby – Tobias Hagge Aug 23 '19 at 01:05

1 Answers1

1

In your example, Spark will be able to recover very quickly all the records linked to that date. That's an improvement. In the following piece of code, you can see that the filter has been categorized as partition filter.

inputRdd = sc.parallelize([("fish", 1), ("cats",2), ("dogs",3)])
schema = StructType([StructField("animals", StringType(), True),
                StructField("ID", IntegerType(), True)])
my_dataframe = inputRdd.toDF(schema)
my_dataframe.write.partitionBy('animals').parquet("home")
sqlContext.read.parquet('home').filter(col('animals') == 'fish').explain()

== Physical Plan ==
*(1) FileScan parquet [ID#35,animals#36] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/home], PartitionCount: 1, PartitionFilters: [isnotnull(animals#36), (animals#36 = fish)], PushedFilters: [], ReadSchema: struct<ID:int>

For a deeper insight, you may want to have a look at this.

I am actually not sure about your other question. You are probably right, in my example df.rdd.getNumPartitions() gives 1. And with one partition performances are not so great (but you have already read from the disk at this point). For the following steps calling repartition(n) will fix the problem but it may be quite costly.

Another possible improvement is related to joining two data frames that share the same partitioning (with the join keys being the partition columns), you will avoid a lot of shuffles in the join phase.

LizardKing
  • 601
  • 6
  • 13