17

Problem: I want to import data into Spark EMR from S3 using:

data = sqlContext.read.json("s3n://.....")

Is there a way I can set the number of nodes that Spark uses to load and process the data? This is an example of how I process the data:

data.registerTempTable("table")
SqlData = sqlContext.sql("SELECT * FROM table")

Context: The data is not too big, takes a long time to load into Spark and also to query from. I think Spark partitions the data into too many nodes. I want to be able to set that manually. I know when dealing with RDDs and sc.parallelize I can pass the number of partitions as an input. Also, I have seen repartition(), but I am not sure if it can solve my problem. The variable data is a DataFrame in my example.

Let me define partition more precisely. Definition one: commonly referred to as "partition key" , where a column is selected and indexed to speed up query (that is not what i want). Definition two: (this is where my concern is) suppose you have a data set, Spark decides it is going to distribute it across many nodes so it can run operations on the data in parallel. If the data size is too small, this may further slow down the process. How can i set that value

pemfir
  • 365
  • 1
  • 3
  • 10
  • Possible duplicate of [How to define partitioning of a Spark DataFrame?](http://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-a-spark-dataframe) – Julian V. Modesto Jan 04 '16 at 19:18
  • Let me define partition more precisely. Definition one: commonly referred to as "partition key" , where a column is selected and indexed to speed up query. Definition two: (this is where my concern is) suppose you have a data set, Spark decides it is going to distribute it across many nodes so it can run operations on the data in parallel. If the data size is too small, this may further slow down the process. How can i set that value ? – pemfir Jan 04 '16 at 19:46
  • You're correct in making a clear distinction between table partitioning as in relational databases vs. RDD partitioning. See [How to define partitioning of a Spark DataFrame?](http://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-a-spark-dataframe), which describes how to do DataFrame partitioning, as in _RDD/distributed partitioning_. – Julian V. Modesto Jan 04 '16 at 22:15

3 Answers3

14

By default it partitions into 200 sets. You can change it by using set command in sql context sqlContext.sql("set spark.sql.shuffle.partitions=10");. However you need to set it with caution based up on your data characteristics.

Durga Viswanath Gadiraju
  • 3,896
  • 2
  • 14
  • 21
10

You can call repartition() on dataframe for setting partitions. You can even set spark.sql.shuffle.partitions this property after creating hive context or by passing to spark-submit jar:

spark-submit .... --conf spark.sql.shuffle.partitions=100

or

dataframe.repartition(100)
Tshilidzi Mudau
  • 7,373
  • 6
  • 36
  • 49
2

Number of "input" partitions are fixed by the File System configuration.

1 file of 1Go, with a block size of 128M will give you 10 tasks. I am not sure you can change it.

repartition can be very bad, if you have lot of input partitions this will make lot of shuffle (data traffic) between partitions.

There is no magic method, you have to try, and use the webUI to see how many tasks are generated.

Thomas Decaux
  • 21,738
  • 2
  • 113
  • 124