21

When decreasing the number of partitions one can use coalesce, which is great because it doesn't cause a shuffle and seems to work instantly (doesn't require an additional job stage).

I would like to do the opposite sometimes, but repartition induces a shuffle. I think a few months ago I actually got this working by using CoalescedRDD with balanceSlack = 1.0 - so what would happen is it would split a partition so that the resulting partitions location where all on the same node (so small net IO).

This kind of functionality is automatic in Hadoop, one just tweaks the split size. It doesn't seem to work this way in Spark unless one is decreasing the number of partitions. I think the solution might be to write a custom partitioner along with a custom RDD where we define getPreferredLocations ... but I thought that is such a simple and common thing to do surely there must be a straight forward way of doing it?

Things tried:

.set("spark.default.parallelism", partitions) on my SparkConf, and when in the context of reading parquet I've tried sqlContext.sql("set spark.sql.shuffle.partitions= ..., which on 1.0.0 causes an error AND not really want I want, I want partition number to change across all types of job, not just shuffles.

samthebest
  • 30,803
  • 25
  • 102
  • 142

3 Answers3

13

Watch this space

https://issues.apache.org/jira/browse/SPARK-5997

This kind of really simple obvious feature will eventually be implemented - I guess just after they finish all the unnecessary features in Datasets.

samthebest
  • 30,803
  • 25
  • 102
  • 142
0

I do not exactly understand what your point is. Do you mean you have now 5 partitions, but after next operation you want data distributed to 10? Because having 10, but still using 5 does not make much sense… The process of sending data to new partitions has to happen sometime.

When doing coalesce, you can get rid of unsued partitions, for example: if you had initially 100, but then after reduceByKey you got 10 (as there where only 10 keys), you can set coalesce.

If you want the process to go the other way, you could just force some kind of partitioning:

[RDD].partitionBy(new HashPartitioner(100))

I'm not sure that's what you're looking for, but hope so.

szefuf
  • 500
  • 3
  • 14
  • 3
    Every partition has a location, i.e. a node, suppose I have 5 partitions and 5 nodes. If I call `repartition`, or your code, to 10 partitions, this will shuffle the data - that is data for each of the 5 nodes may pass over the network onto other nodes. What I want, is that Spark simply splits each partition into 2 without moving any data around - this is what happens in Hadoop when tweaking split settings. – samthebest Mar 30 '15 at 09:23
  • I am not sure if you can do it. I guess that you'd need some kind of `.forEachNode` function. But I never seen anything like this. And I'm not sure if it can be easily implemented. The partitioner has to return the same partition for the same object every time. By default Spark use `HashPartitioner`, which do **hashCode modulo number_of_partitions**. If you just split data into two new partitions, they would definitly end up in not their places. That's why shuffle is neccessary. Maybe if you have your own partitioner, it could increase number of partitions whithout shuffling over net. – szefuf Mar 30 '15 at 12:46
-1

As you know pyspark use some kind of "lazy" way of running. It will only do the computation when there is some action to do (for exemple a "df.count()" or a "df.show()". So what you can do is define the a shuffle partition between those actions.

You can write :

sparkSession.sqlContext().sql("set spark.sql.shuffle.partitions=100")
# you spark code here with some transformation and at least one action
df = df.withColumn("sum", sum(df.A).over(your_window_function))
df.count() # your action

df = df.filter(df.B <10)
df = df.count()   

sparkSession.sqlContext().sql("set spark.sql.shuffle.partitions=10")
# you reduce the number of partition because you know you will have a lot 
# less data
df = df.withColumn("max", max(df.A).over(your_other_window_function))
df.count() # your action
Adrien Forbu
  • 483
  • 4
  • 18
  • 3
    `spark.sql.shuffle.partitions`will only have an effect on shuffling operations such as joins, aggegation and sorting... but not on filtering – Raphael Roth Nov 27 '18 at 20:12