5

I have code that his goal is to take the 10M oldest records out of 1.5B records.

I tried to do it with orderBy and it never finished and then I tried to do it with a window function and it finished after 15min.

I understood that with orderBy every executor takes part of the data, order it and pass the top 10M to the final executor. Because 10M>partition size we are passing to the final executor all the data and then it's taking a lot of time to finish.

I couldn't understand how the window solution works? What's happening in the shuffle before the single executor starts to run? How this shuffle helps to the sorting in the single executor to work faster? I would appreciate any help with understanding how window function in this case works in the background.

This is the code of the window function:

df = sess.sql("select * from table")
last_update_window = Window.orderBy(f.col("last_update").asc())
df = df.withColumn('row_number', f.row_number().over(last_update_window))
df = df.where(f.col('row_number') <= 1000000000)

This is the code for orderBy:

df = sess.sql("select * from table")
df = df.orderBy(f.col('last_update').asc()).limit(100000000)

Here is a picture of the plan when executing the window function:

enter image description here

dasilva555
  • 93
  • 1
  • 2
  • 12
  • Might be copy error, but 1 000 000 000 != 10 000 000 – ScootCork Apr 26 '22 at 13:01
  • 1
    I appreciate you asking this as I didn't know that windows performed better at any task. Typically they are a tools used for specific questions but typically are more computationally expensive, than if you can avoid using them. – Matt Andruff Apr 26 '22 at 20:05
  • May be of interest to you, not specifically on topic, but shows another area where windows out performs groupby+join https://www.youtube.com/watch?v=9EIzhRKpiM8&t=700s&ab_channel=Databricks – Matt Andruff May 25 '22 at 13:01

1 Answers1

1

Run Explain on both queries, and that will show you the different paths they take. T

Window sends all the data to 1 node. In this case you also use a where clause that means it uses a shuffle to complete the filtering. This seems to be faster than the implementation used by limit for a large number of items. It's likely faster because of the volume of data. Extra shuffles hurt for small data sets but if optimized correctly on large data sets helps spread the load and reduce the time it takes.

    == Physical Plan ==
*(2) Filter (isnotnull(row_number#66) && (row_number#66 <= 1000000000))
+- Window [row_number() windowspecdefinition(date#61 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number#66], [date#61 ASC NULLS FIRST]
   +- *(1) Sort [date#61 ASC NULLS FIRST], false, 0
      +- Exchange SinglePartition
         +- Scan hive default.table_persons [people#59, type#60, date#61], HiveTableRelation `default`.`table_persons`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [people#59, type#60, date#61]

"Order by" uses a different implementation and does not appear to perform as well under with 'large takes'. It doesn't use as many shuffles but this seems that it doesn't accomplish the work as fast when there is a large number of items to return.

== Physical Plan ==
TakeOrderedAndProject(limit=100000000, orderBy=[date#61 ASC NULLS FIRST], output=[people#59,type#60,date#61])
+- Scan hive default.table_persons [people#59, type#60, date#61], HiveTableRelation `default`.`table_persons`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [people#59, type#60, date#61]

As an aside, for large skewed data sets it common to add 2 extra shuffles and this does in fact take less time than not having the extra shuffles. (But again would increase the time it takes on small data sets.)

So what does TakeOrderedAndProject actually do? It uses a take and sorts the data on disk for large datasets. (Instead of sorting in memory).

With your window it does do a shuffle which does use ranges to sort the data. There are inferences that further sorting is done in memory giving you the performance tradeoff. (Updated links to inferences below.)

And this is where I think you are getting the payoff. (I'd be interested to know if adding a limit to your existing window speed things up.)

From snippets on reddit and digging issues the code it the are inference that the sort is done in memory and spilt to disk if it's required.

The other piece that I feel provide a lot of performance boost is that you use a where clause. Take pulls back as many items from each partition as you have in the limit clause. (see the implementation above.) This is done only to throw out the items. where doesn't not pull any items back that do not match the filter condition. This movement of data [using limit] is likely where you are getting the real performance degradation in limit.

Matt Andruff
  • 4,974
  • 1
  • 5
  • 21
  • Order by send data to the reducer according to the limit we define. If I do the same logic with limit 100 it works really fast with orderBy. Another interesting thing is that the performance of window with 100 and with 10M is the same so I guess that your assumption about the stop after reaching the 10M is wrong – dasilva555 Apr 26 '22 at 18:46
  • I have corrected my answer. It does filter, but you are correct it does no stop after reaching 10M. – Matt Andruff Apr 26 '22 at 19:58
  • Thanks for the answer. I still don’t understand why window works better. They both work finally on single executor and the difference is that window is doing shuffle before. My question is what the logic behind the shuffle that makes the single executor job easier. Does the shuffle split the data into order ranges? I really want to understand the logic behind the scenes. I also noticed that with window the shuffle read in the single executor increasing all the time and with order by it stuck on the same number. – dasilva555 Apr 27 '22 at 02:19
  • Added more information to my answer. I feel it's now a complete answer. – Matt Andruff Apr 28 '22 at 12:44
  • Thanks!. So the difference between orderBy and Window is that orderBy sort on disk and window sort in memory? Do you have reference for this statement? – dasilva555 Apr 28 '22 at 17:31
  • That is one difference, the other and more important one is using a filter instead of a take. – Matt Andruff Apr 28 '22 at 17:33
  • If you read through the links you'll see that limit uses disk for sorting on large data sets(I link to this above) and you can see from the discussion above that windows do it in memory(link provided above). That's going to be a plus but not as big as avoiding using TakeOrderedAndProject. Because it will move a lot of data and a filter will not. – Matt Andruff Apr 28 '22 at 17:36
  • Said another way. Limits performance gets worse with the more items that you ask it to limit. (as it does a 'take' twice, once per paritions and once after all partitions have returned data) A where clause filters and only returns relevant information creating a big performance gain over 2 takes. – Matt Andruff Apr 28 '22 at 17:41
  • If we take the 10M example so orderBy will take from each partition the top 10M and then will sort all the outputs and take again the top10M while window will sort all the data and take the top 10M. Why is there so big difference between this 2 strategies? In addition with window we read all the data to single executor in chunks and order it. Why does it getting done in chunks? I see in the spark ui that every couple of seconds the shuffle read increase for the single executor. What's the logic behind it? Is it because we want to be able to compute everything in memory? – dasilva555 Apr 28 '22 at 17:58
  • Do we read the chunks in ordered way or we take the chunks randomly? – dasilva555 Apr 28 '22 at 17:59
  • It's actually worse than that. orderby with Limit will order all the data, Then take the top 10M from each partition. Return all that data to so that it can then take 10M from the all the partitions. This is a lot of data movement for not a lot of value. This is where the performance hit is. – Matt Andruff Apr 28 '22 at 18:04
  • Window does a ranger filter as we said as part of it's sort. That is why the data gets done in chunks. It's reading [ranged filtered data] and sorting it in memory. – Matt Andruff Apr 28 '22 at 18:05
  • Got it for the orderBy But for the window if you say that we read ranged filtered data so why we get the same performance for 100 and 10M? I guess that it's filtered just after complete the sorting operation – dasilva555 Apr 28 '22 at 18:13
  • That is correct. The filter is after. (You can see this in the explain above). In both cases it's the amount of data that has to move that make one faster than the other. Limit 100 moves a lot less data after sorting. the window still moves all the data to one node. Limit is moving less data. When the data gets large, limit is moving more data and it's slower. – Matt Andruff Apr 28 '22 at 18:19
  • Thanks for the answer. I understand it much better now. Last question, why in window there is no sorting in multiple executors? Why we read it in chunks and sort everything in the reducer instead of sorting in multiple executors and then read the output according to the partitions ranges output? – dasilva555 Apr 28 '22 at 18:22
  • And you wrote "When the data gets large, limit is moving more data" But window move all the data to the single executor anyway. So I guess that in this case the only difference is the memory sorting + one extra take in the single executor instead of filter. Isn't it? – dasilva555 Apr 28 '22 at 18:24
  • It's not 1 extra take. If you read through the implementation above, it's taking 100000000 rows from *each* partition. Then taking 100000000 from all the results returned from the partitions. If you have 'n' paritions then you are doing 'n'+1 takes of 100000000 rows. – Matt Andruff Apr 28 '22 at 18:32
  • If we mark the all data size as x so min(10M,partition size) * n = x. In window function we take all the x data to the single executor so what’s the difference? Is it less efficient to do the takes in parts? When we pass the data to the single executor in window it’s equivalent to take the x data and it’s equivalent to take n*min(10M,partition size). Isn’t it? – dasilva555 Apr 28 '22 at 19:26
  • I agree that both data sets touch all the data. (They both sort the data) take is used multiple times in "orderby/limit" and is not as efficient as using a filter.(as window does). I say this is because of the amount of data that is processed by mutiple takes (in addition to the sort) vs using a where clause that reduces the amount of data that must be returned. – Matt Andruff Apr 28 '22 at 19:53
  • Literally using TakeOrderedAndProject(orderby/limit) creates way more objects than is created with window – Matt Andruff Apr 28 '22 at 19:55
  • Here's the link to take. It's actually creating objects for all rows... doing this mutiple times will be expensive on performance: (Yes under the hood RDDs are used for datasets/dataframes) https://github.com/apache/spark/blob/ae9b80410a12c67bad19aa22b5e0de956fef9d17/core/src/main/scala/org/apache/spark/rdd/RDD.scala – Matt Andruff Apr 28 '22 at 20:05