7

I am trying to do something very simple and I'm having some very stupid struggles. I think it must have to do with a fundamental misunderstanding of what spark is doing. I would greatly appreciate any help or explanation.

I have a very large (~3 TB, ~300MM rows, 25k partitions) table, saved as parquet in s3, and I would like to give someone a tiny sample of it as a single parquet file. Unfortunately, this is taking forever to finish and I don't understand why. I have tried the following:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")

and then when that didn't work I tried this, which I thought should be the same, but I wasn't sure. (I added the print's in an effort to debug.)

tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")

When I watch the Yarn UI, both print statements and the write are using 25k mappers. The count took 3 mins, the show took 25 mins, and the write took ~40 mins, although it finally did write the single file table I was looking for.

It seems to me like the first line should take the top 500 rows and coalesce them to a single partition, and then the other lines should happen extremely fast (on a single mapper/reducer). Can anyone see what I'm doing wrong here? I've been told maybe I should use sample instead of limit but as I understand it limit should be much faster. Is that right?

Thanks in advance for any thoughts!

seth127
  • 2,594
  • 5
  • 30
  • 43
  • 1
    Could you check in plan "LIMIT 500" is pushed to table ? – Karthick Sep 06 '18 at 19:46
  • @Karthick Both `spark.sql` and dataframes (e.g. `.limit(500)`) are optimised by the same engine, so shouldn’t be an issue? – dijksterhuis Sep 07 '18 at 07:35
  • I am not concerned over spark.sql and dataframes optimized by same engine, there is no reason read took too long to get 500 records. quick spot on plan in spark ui when it was running will get some direction ( limit applied and only 500 records is what flowing out of table and what is the numpartition on that - parallelism parameter where i expect it to be '1') is what my thought. – Karthick Sep 07 '18 at 08:05

2 Answers2

10

I’ll approach the print functions issue first, as it’s something fundamental to understanding spark. Then limit vs sample. Then repartition vs coalesce.

The reasons the print functions take so long in this manner is because coalesce is a lazy transformation. Most transformations in spark are lazy and do not get evaluated until an action gets called.

Actions are things that do stuff and (mostly) dont return a new dataframe as a result. Like count, show. They return a number, and some data, whereas coalesce returns a dataframe with 1 partition (sort of, see below).

What is happening is that you are rerunning the sql query and the coalesce call each time you call an action on the tiny dataframe. That’s why they are using the 25k mappers for each call.

To save time, add the .cache() method to the first line (for your print code anyway).

Then the data frame transformations are actually executed on your first line and the result persisted in memory on your spark nodes.

This won’t have any impact on the initial query time for the first line, but at least you’re not running that query 2 more times because the result has been cached, and the actions can then use that cached result.

To remove it from memory, use the .unpersist() method.

Now for the actual query youre trying to do...

It really depends on how your data is partitioned. As in, is it partitioned on specific fields etc...

You mentioned it in your question, but sample might the right way to go.

Why is this?

limit has to search for 500 of the first rows. Unless your data is partitioned by row number (or some sort of incrementing id) then the first 500 rows could be stored in any of the the 25k partitions.

So spark has to go search through all of them until it finds all the correct values. Not only that, it has to perform an additional step of sorting the data to have the correct order.

sample just grabs 500 random values. Much easier to do as there’s no order/sorting of the data involved and it doesn’t have to search through specific partitions for specific rows.

While limit can be faster, it also has its, erm, limits. I usually only use it for very small subsets like 10/20 rows.

Now for partitioning....

The problem I think with coalesce is it virtually changes the partitioning. Now I’m not sure about this, so pinch of salt.

According to the pyspark docs:

this operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.

So your 500 rows will actually still sit across your 25k physical partitions that are considered by spark to be 1 virtual partition.

Causing a shuffle (usually bad) and persisting in spark memory with .repartition(1).cache() is possibly a good idea here. Because instead of having the 25k mappers looking at the physical partitions when you write, it should only result in 1 mapper looking at what is in spark memory. Then write becomes easy. You’re also dealing with a small subset, so any shuffling should (hopefully) be manageable.

Obviously this is usually bad practice, and doesn’t change the fact spark will probably want to run 25k mappers when it performs the original sql query. Hopefully sample takes care of that.

edit to clarify shuffling, repartition and coalesce

You have 2 datasets in 16 partitions on a 4 node cluster. You want to join them and write as a new dataset in 16 partitions.

Row 1 for data 1 might be on node 1, and row 1 for data 2 on node 4.

In order to join these rows together, spark has to physically move one, or both of them, then write to a new partition.

That’s a shuffle, physically moving data around a cluster.

It doesn’t matter that everything is partitioned by 16, what matters is where the data is sitting on he cluster.

data.repartition(4) will physically move data from each 4 sets of partitions per node into 1 partition per node.

Spark might move all 4 partitions from node 1 over to the 3 other nodes, in a new single partition on those nodes, and vice versa.

I wouldn’t think it’d do this, but it’s an extreme case that demonstrates the point.

A coalesce(4) call though, doesn’t move the data, it’s much more clever. Instead, it recognises “I already have 4 partitions per node & 4 nodes in total... I’m just going to call all 4 of those partitions per node a single partition and then I’ll have 4 total partitions!”

So it doesn’t need to move any data because it just combines existing partitions into a joined partition.

dijksterhuis
  • 1,225
  • 11
  • 25
  • Thank you, this is very helpful! To your first point: my understanding of the lazy eval was that it didn't actually execute until something like `tiny.count()` is called (as you said) but then I don't understand why it has to eval _again_ for the subsequent lines. I guess because I didn't cache the df? This leads to your second point: I've never heard this about "virtual" partitioning. Can you point me to any docs/explanation about that. I figured, once the `tiny.count()` happened, then all 500 rows were on the same partition. – seth127 Sep 07 '18 at 15:27
  • Just to get on a soapbox for a short minute: I've read literally dozens of explanations of "repartition vs. coalesce" and none of them seem to adequately explain it. The "virtual" partitioning is a perfect example (i.e. that I've never heard of this concept). Maybe "all repartition does is call coalesce with the shuffle parameter set to `true`" (from https://stackoverflow.com/questions/31610971) is all I need to know and I just need to better understand the implications of shuffling. I'm not sure. Any good explanation/docs you can point me to would be greatly appreciated. – seth127 Sep 07 '18 at 15:33
  • @seth127 also just found this.. might be useful https://www.edureka.co/blog/demystifying-partitioning-in-spark – dijksterhuis Sep 12 '18 at 10:46
4

Try this, in my empirical experience repartition works better for this kind of problems:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.saveAsTable("db.tiny_table")

Even better if you are interested in the parquet you don't need to save it as a table:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.parquet(your_hdfs_path+"db.tiny_table")
Alessandro
  • 845
  • 11
  • 21
  • Hmm, I'm not sure what you mean by "prefers" although I did find another post saying "I think the current documentation covers this pretty well: github.com/apache/spark/blob/… Keep in mind that all repartition does is call coalesce with the shuffle parameter set to true. Let me know if that helps." So that's interesting, but I'm not clear on why that would make it faster. (the other post: https://stackoverflow.com/questions/31610971/spark-repartition-vs-coalesce) – seth127 Sep 06 '18 at 15:01
  • You are right I meant I empirically found it to be working better! – Alessandro Sep 06 '18 at 15:11