2

I have a simple join where I limit on of the sides. In the explain plan I see that before the limit is executed there is an ExchangeSingle operation, indeed I see that at this stage there is only one task running in the cluster.

This of course affects performance dramatically (removing the limit removes the single task bottleneck but lengthens the join as it works on a much larger dataset).

Is limit truly not parallelizable? and if so- is there a workaround for this?

I am using spark on Databricks cluster.

Edit: regarding the possible duplicate. The answer does not explain why everything is shuffled into a single partition. Also- I asked for advice to work around this issue.

Vitaliy
  • 8,044
  • 7
  • 38
  • 66
  • Possible duplicate of [Towards limiting the big RDD](https://stackoverflow.com/questions/38710018/towards-limiting-the-big-rdd) – Alper t. Turker Jul 22 '18 at 14:04
  • 2
    _why everything is shuffled into_ - because this is the only reasonable implementation which ensures a single pass over data and exact results. _advice to work around this issue._ - consider relaxing the requirements and sample, not limit? – Alper t. Turker Jul 22 '18 at 19:31
  • 1
    @user8371915, at first I thought that Spark knows the exact size of each partition in advance- but I think I got it wrong. Suppose there is a filter followed by a limit. Spark does not know the size of the partition in advance and has to evaluate it in order to take elements from there. I guess spark *could* speculate and evaluate more than one partition concurrently but the designers chose not to go into it. Did I understand correctly? – Vitaliy Jul 22 '18 at 19:56
  • 1
    @user8371915, I switched the limit to sample and it worked like a charm. I you post it as an answer I'll accept it. If you can through in an explanation on the implementation details as a bonus for the community, all the better. Thanks. – Vitaliy Jul 22 '18 at 19:58
  • 2
    In general Spark doesn't know the size of the partition. It might in some cases know input size (from input splits) or number of records (if cost based optimizer or other form of computing statistics is used - this requires additional scan), but it is not the case in general. You could write some form of optimizer rule, but it is hard to generalize. – Alper t. Turker Jul 22 '18 at 20:27
  • @user8371915, thanks again. Reminding you to post this as an answer if you wish. I will accept it. – Vitaliy Jul 23 '18 at 11:39
  • 1
    I don't think there is a need for that, and I sure there is a better duplicate target out there - just my search skills are not so good today. I am glad my comment helped you, and I don't think that the question really deserves the downvote, so don't mind me and delete it, or if you prefer self answer :) – Alper t. Turker Jul 23 '18 at 11:42

2 Answers2

1

Following the advice given by user8371915 in the comments, I used sample instead of limit. And it uncorked the bottleneck.

A small but important detail: I still had to put a predictable size constraint on the result set after sample, but sample inputs a fraction, so the size of the result set can very greatly depending on the size of the input.

Fortunately for me, running the same query with count() was very fast. So I first counted the size of the entire result set and used it to compute the fraction I later used in the sample.

Vitaliy
  • 8,044
  • 7
  • 38
  • 66
0

Workaround for parallelization after limit: .repartition(200)

This redistributes the data again so that you can work in parallel.