57

While fetching data from SQL Server via a JDBC connection in Spark, I found that I can set some parallelization parameters like partitionColumn, lowerBound, upperBound, and numPartitions. I have gone through spark documentation but wasn't able to understand it.

Can anyone explain me the meanings of these parameters?

Rodrigue
  • 3,617
  • 2
  • 37
  • 49
Bhanuday Birla
  • 969
  • 1
  • 10
  • 23

4 Answers4

35

It is simple:

  • partitionColumn is a column which should be used to determine partitions.
  • lowerBound and upperBound determine range of values to be fetched. Complete dataset will use rows corresponding to the following query:

    SELECT * FROM table WHERE partitionColumn BETWEEN lowerBound AND upperBound
    
  • numPartitions determines number of partitions to be created. Range between lowerBound and upperBound is divided into numPartitions each with stride equal to:

    upperBound / numPartitions - lowerBound / numPartitions
    

    For example if:

    • lowerBound: 0
    • upperBound: 1000
    • numPartitions: 10

    Stride is equal to 100 and partitions correspond to following queries:

    • SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100
    • SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200
    • ...
    • SELECT * FROM table WHERE partitionColumn BETWEEN 900 AND 1000
user7280077
  • 495
  • 4
  • 2
  • 4
    See Andrea's answer. The first and the last SELECT are correct in his answer but not in this one – Can't Tell Mar 14 '19 at 09:50
  • 21
    In [Spark docs](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) it says: **Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.** This would mean that the *whole* table will be fetched, and not just the part between lowerBound and upperBound. – mj3c Feb 04 '20 at 09:31
  • 5
    Answer is not accurate as in some databases `BETWEEN` is inclusive in both lower and upper bound. The actual implementation uses `>=` and `<` respectively: [Spark doc](https://github.com/apache/spark/blob/17edfec59de8d8680f7450b4d07c147c086c105a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala#L85-L97) – Hedrack Apr 30 '20 at 04:07
  • 4
    This is answer is flat wrong, implying that upper and lower bound values filter the dataset being read. – CClarke Apr 07 '21 at 14:45
33

Actually the list above misses a couple of things, specifically the first and the last query.

Without them you would loose some data (the data before the lowerBound and that after upperBound). From the example is not clear because the lower bound is 0.

The complete list should be:

SELECT * FROM table WHERE partitionColumn < 100

SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100  
SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200  

...

SELECT * FROM table WHERE partitionColumn > 9000
Pradeep
  • 9,667
  • 13
  • 27
  • 34
Andrea
  • 2,714
  • 3
  • 27
  • 38
  • 3
    This is 100% accurate for JdbcRDD ([see the code](https://github.com/apache/spark/blob/17edfec59de8d8680f7450b4d07c147c086c105a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala#L85-L97)). In particular, if you set `upperBound` too low, one executor will get way more work than the others and possibly run out of memory. – Phil Calvin Jan 25 '18 at 19:20
  • Quick Question: What type of Columns are best suitable for PartitionBy ? Sequence? UUID? – Sandeep540 Sep 16 '19 at 15:01
  • The BETWEEN operator is inclusive: begin and end values are included. So the above sql will query duplicate data, right? – Shuai Liu Nov 15 '19 at 01:11
  • Answer is not accurate as in some databases `BETWEEN` is inclusive in both lower and upper bound. The actual implementation uses `>=` and `<` respectively: [Spark doc](https://github.com/apache/spark/blob/17edfec59de8d8680f7450b4d07c147c086c105a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala#L85-L97) – Hedrack Apr 30 '20 at 04:07
  • Is this answer assuming that the value of `partitionColumn` is counting up from 1 to 1000, for example? It doesn't make sense otherwise. I've got a `partitionColumn` that is assigned by a Netezza database and it is a big number `234235000` through `234234999`. If `upperBound` refers to the number of elements, rather than the value within partitionColumn, this answer does not make sense to me. – pauljohn32 May 20 '21 at 23:38
  • This should not be the accepted answer, this statement about Spark `lowerBound` and `upperBound` behavior is incorrect: `Without them you would loose some data (the data before the lowerBound and that after upperBound).`. As explained by @mj3c, Spark documentation states that: "Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading". – Quan Bui Oct 07 '22 at 08:12
29

Creating partitions doesn't result in loss of data due to filtering. The upperBound, lowerbound along with numPartitions just defines how the partitions are to be created. The upperBound and lowerbound don't define the range (filter) for the values of the partitionColumn to be fetched.

For a given input of lowerBound (l), upperBound (u) and numPartitions (n) 
The partitions are created as follows:

stride, s= (u-l)/n

**SELECT * FROM table WHERE partitionColumn < l+s or partitionColumn is null**
SELECT * FROM table WHERE partitionColumn >= l+s AND <2s  
SELECT * FROM table WHERE partitionColumn >= l+2s AND <3s
...
**SELECT * FROM table WHERE partitionColumn >= l+(n-1)s**

For instance, for upperBound = 500, lowerBound = 0 and numPartitions = 5. The partitions will be as per the following queries:

SELECT * FROM table WHERE partitionColumn < 100 or partitionColumn is null
SELECT * FROM table WHERE partitionColumn >= 100 AND <200 
SELECT * FROM table WHERE partitionColumn >= 200 AND <300
SELECT * FROM table WHERE partitionColumn >= 300 AND <400
...
SELECT * FROM table WHERE partitionColumn >= 400

Depending on the actual range of values of the partitionColumn, the result size of each partition will vary.

David Walschots
  • 12,279
  • 5
  • 36
  • 59
PIYUSH PASARI
  • 401
  • 4
  • 6
  • Can you clarify your assumption about the values within `partitionColumn`? If you don't have unique integer ROWID like 1, 2, 3, 4,..., I can't understand how this would work. – pauljohn32 May 20 '21 at 23:40
  • 1
    This should have been the accepted that clearly explains how the partitioning works. – Harikrishnan Balachandran Aug 25 '22 at 09:48
  • Having a hard time deciding if this means `upperBound` is inclusive or exclusive. All data is queried, sure, but in your example, if "500" is actually a valid value, then all partitions are of size "100", except the last, which is of size "101". Not a big deal in this example, but in an example of wanting one distinct value per partition, the last partition would be twice as big as all the others. – Alain Dec 09 '22 at 16:08
21

Would just like to add to the verified answer since the words,

Without them you would loose some data is misleading..

From the documentation, Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.

Which means say your table has a 1100 rows, and you specify

lowerBound 0

upperBound 1000 and

numPartitions: 10 , you won't loose the 1000 to 1100 rows. You'll just end up with some of the partitions having more rows than intended instead.(the stride value is 100).

Hemanth Gowda
  • 604
  • 4
  • 16
  • 1
    Do you know what Spark does with the remaining 100 rows? For example, would this mean that your 10 partitions would have 110 rows? – Blaisem May 12 '21 at 12:21