4

I have searched a lot for a succinct answer, hopefully someone can help me with some clarity on databricks partitioning..

assume i have a data frame with columns: Year, Month, Day, SalesAmount, StoreNumber

I want to store this partitioned by Year, & Month.. so i can run the following command:

df.write.partitionBy('Year', 'Month').format('csv').save('/mnt/path/', header='true')

This will output data in the format of: /path/Year=2019/Month=05/<file-0000x>.csv

If i then load it back again, such as:

spark.read.format('csv').options(header='true').load('/mnt/path/').createOrReplaceTempView("temp1")

Q1: This has not yet actually 'read' the data yet, right? i.e. i could have billions of records.. but until i actually query temp1, nothing is executed against the source?

Q2-A: Subsequently, when querying this data using temp1, it is my assumption that if i include the items that were used in the partitioning in the where clause, a smart filtering on the actual files that are read off the disk will be applied?

%sql
select * from temp1 where Year = 2019 and Month = 05 -- OPTIMAL

whereas the following would not do any file filtering as it has no context of which partitions to look in:

%sql
select * from temp1 where StoreNum = 152 and SalesAmount > 10000 -- SUB-OPTIMAL

Q2-B: Finally, if i stored the files in parquet format (rather than *.csv).. would both of the queries above 'push down' in to the actual data stored.. but in perhaps different ways?

I.e. the first would still use the partitions, but the second (where StoreNum = 152 and SalesAmount > 10000) will now use columnar storage of parquet? While *.csv does not have that optimisation?

Can anyone please clarify my thinking / understanding around this?

links to resources would be great also..

m1nkeh
  • 1,337
  • 23
  • 45

2 Answers2

4

A1: You are right about the evaluation of createOrReplaceTempView. This will be evaluated lazily for the current Spark session. In other word if you terminate Spark session without accessing it the data will never be transfered into temp1.

A2: Let's examine the case through an example using your code. First let's save your data with:

df.write.mode("overwrite").option("header", "true")
  .partitionBy("Year", "Month")
  .format("csv")
  .save("/tmp/partition_test1/")

And then load it with:

val df1 = spark.read.option("header", "true")
                .csv("/tmp/partition_test1/")
                .where($"Year" === 2019 && $"Month" === 5)

Executing df1.explain will return:

== Physical Plan ==
*(1) FileScan csv [Day#328,SalesAmount#329,StoreNumber#330,Year#331,Month#332] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 0, Partition
Filters: [isnotnull(Year#331), isnotnull(Month#332), (Year#331 = 2019), (Month#332 = 5)], PushedFilters: [], ReadSchema: struct<Day:string,SalesAmount:string,StoreNumber:string>

As you can see the PushedFilters: [] array is empty although the PartitionFilters[] is not, indicating that Spark was able to apply filtering on partitions and therefore pruning the partitions that do not satisfy the where statement.

If we slightly change the Spark query to:

df1.where($"StoreNumber" === 1 && $"Year" === 2011 && $"Month" === 11).explain

== Physical Plan ==
*(1) Project [Day#462, SalesAmount#463, StoreNumber#464, Year#465, Month#466]
+- *(1) Filter (isnotnull(StoreNumber#464) && (cast(StoreNumber#464 as int) = 1))
   +- *(1) FileScan csv [Day#462,SalesAmount#463,StoreNumber#464,Year#465,Month#466] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 1, Par
titionFilters: [isnotnull(Month#466), isnotnull(Year#465), (Year#465 = 2011), (Month#466 = 11)], PushedFilters: [IsNotNull(StoreNumber)], ReadSchema: struct<Day:string,SalesAmount:string,Store
Number:string>

Now both PartitionFilters and PushedFilters will take place minimizing Spark workload. As you can see Spark leverages both filters first by recognizing the existing partitions through PartitionFilters and then applying the predicate pushdown.

Exactly the same applies for parquet files with the big difference that parquet will utilize the predicate pushdown filters even more combining them with its internal columnar based system (as you already mentioned), which keeps metrics and stats over the data. So the difference with CSV files is that in the case of CSVs the predicate pushdown will take place when Spark is reading/scanning the CSV files excluding records that do not satisfy the predicate pushdown condition. When for parquet the predicate pushdown filter will be propagated to the parquet internal system resulting to even larger pruning of data.

In your case loading data from createOrReplaceTempView will not differ and the execution plan will remain the same.

Some useful links:

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

https://www.waitingforcode.com/apache-spark-sql/predicate-pushdown-spark-sql/read

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkStrategy-FileSourceStrategy.html

abiratsis
  • 7,051
  • 3
  • 28
  • 46
  • fantastic answer, thanks for taking the time! Your last point re: createOrReplaceTempView is there a way in which perf will be improved with parquet Vs csv? – m1nkeh May 08 '19 at 15:56
  • Hello @m1nkeh, I dont think so. Both Spark SQL API and Dataframes API will be interpreted into the same logical/execution plan. SQL API is for users that prefer to write SQL versus the dataframe syntax :) – abiratsis May 08 '19 at 16:00
  • ok, so if stays in dataframe syntax there is an optimization? – m1nkeh May 08 '19 at 16:17
  • no, there is no difference between those two. Both will end up to the same execution plan – abiratsis May 08 '19 at 16:24
  • sorry i am still slightly uncertain.. if there is no differences to the execution plans why did you mention the `createOrReplaceTempView` at all? In short, under which circumstances does the execution plan differ? – m1nkeh May 09 '19 at 05:55
  • 1
    No problem at, I mentioned it because you were using it already and since you were using it I wanted to clarify that there is no difference in the execution plan – abiratsis May 09 '19 at 08:23
  • gotcha, cheers so i can improve the execution plan with a different approach? .. maybe that's one for another question! – m1nkeh May 10 '19 at 08:48
  • 1
    Hi there @m1nkeh in this particular case there is nothing to improve since is very simple schema and query :) but in many other cases yes you would like to make such improvements – abiratsis May 10 '19 at 09:03
1

Q1, when you read csv files without providing a schema then it has to infer the schema and a read happens immediately of all files (possibly it filter the partition at this point if it can). If you were to provide a schema your assumptions on filtering are correct as are the execution event assumptions.

Q2. Not sure I follow. When you say two queries do you mean above or below? On the below one does a write, how do you expect filtering to work on a write?

If you are referring to the first two queries in parquet then the first will eliminate most files and be very quick. The second will hopefully skip some data by using statistics on in the files to show that it doesn’t need to read them. But it will still touch every file.

You may find this useful https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example

simon_dmorias
  • 2,343
  • 3
  • 19
  • 33
  • thanks for the reply, i read the question again and can see it wasn't clear.. i've (hopefully) clarified now by expanding out q2 in to q2a, and q2b... but essentially i am simply looking for validation on the way *.csv and *.parquet are stored, and 'optimised' under the covers... i located that link yesterday actually, and gave it a good read, i will read it again! – m1nkeh May 08 '19 at 08:58