2

How does parallelization work using JDBC?

Here is my code:

spark = SparkSession.builder.getOrCreate()
DF    = spark.read.jdbc( url           =  ...,
                         table         = '...',
                         column        = 'XXXX',
                         lowerBound    =  Z,
                         upperBound    =  Y,
                         numPartitions = K
                         )

I would like to know how are related the following parameters and if there is a way to choose them properly:

  1. column -> it should be the column chosen for the partition
    ( Does it need to be a numeric column? )
  2. lowerBound -> is there a rule of thumb to choose it?
  3. upperBound -> is there a rule of thumb to choose it?
  4. numPartitions -> is there a rule of thumb to choose it?

I understood that

stride = ( upperBound / numPartitions ) - ( lowerBound / numPartitions )

Are there many "strides" in each partition?

In other words, are the partitions filled with a bunch of strides until all the observations has finished?

Please, look at this picture enter image description hereto get the sense of the question, considering the following parameters:

 lowerBound     80.000
 upperBound    180.000
 numPartitions       8
 Stride         12.500

Notice that:

 min('XXXX')      =           0
 max('XXXX')      =     350.000
 ('XXXX').count() = 500.000.000

P.S. I read the documentation and this answer, but I didn't understand it very well.

FrancoFranchi
  • 131
  • 1
  • 10

1 Answers1

2
  1. Yes it column needs to be a numeric column according to documentation. Why? Because otherwise you can't calculate the stride which is (upperBound - lowerBound) / numPartitions = 12.500(items per partition)

  2. I think it would be ideal if column is already an indexed column in your database since you will need to retrieve these records as fast as possible. Then upperBound and lowerBound should be the boundaries of your data to retrieve into spark(e.g consider column=id then the data you will need could be id between 1 and max(id)).

  3. The right numPartitions is a difficult topic to approach precisely for all the cases. One classic issue to be aware of though is the size of your connection pool. You should avoid for instance creating more connections in parallel than your pool can handle. Of course the number of parallel connections is directly connected to the number of partitions. For example if you have 8 max partitions you ensure that the maximum number of parallel connections is also 8. For more about how to choose a right value for numPartitions you can check this

Good luck

abiratsis
  • 7,051
  • 3
  • 28
  • 46
  • Hi Alexandros and thank you for the answer you provide! Looking at point 2, you said that "[...] upperBound and lowerBound should be the boundaries of your data to retrieve into spark [...]". So, is it correct to say that if I set `id between 1 and max(id) / 2` I would have a `.count()` that is 1/2 of the whole DB? Many thanks! – FrancoFranchi Mar 13 '18 at 14:54
  • It depends in the distribution of your ids @Franco. If the id increases by one on each insert then yes is safe although many database don't ensure that during the next insert the id will be equal to new_id = max(id) + 1 because they want avoid this additional overhead. So I would say you need to check it to be sure. You could check for example if max(id) - min(id) ~ count(*) if is exactly the same then yes it is safe to make your assumption. If it is close enough you can decide by yourself depending on how much accuracy you need – abiratsis Mar 13 '18 at 15:05
  • So, considering the following scenario: `min('XXXX') = 0` `max('XXXX') = 350.000` `('XXXX').count() = 500.000.000` Would you choose the lower and the upperBound based on the value (0 ~ 350.000) or based on the fact that I have 500.000.000 indexes? – FrancoFranchi Mar 13 '18 at 15:12
  • you can't make this assumption in this case. Lets take another example lets say you have the table companies. And this table has a PK called id then if you have 100.000 records on this table the id must be at least 100.000. So in the example you sent me it doesn't make sense to make this assumption. In other words in your last comment there is no linear relation between count and the column. The column being used must be a key if possible otherwise you must reconsider using numPartitions – abiratsis Mar 13 '18 at 15:22
  • In my DB I don't have a key (index) column. I just have 3 numeric columns. How would you proceed to parallelize operations considering the following seettings `min('XXXX') = 0` `max('XXXX') = 350.000` `('XXXX').count() = 500.000.000?` – FrancoFranchi Mar 14 '18 at 07:46
  • Hi @franco this field can't guarantee a normal distribution of your dataset thus should not be used. If you dont have an "Id" field in your database which guarantees the equal distribution of your data I would not use numPartitions but fetchsize. Although I would suggest you to post your database schema as well since there might be a different column you can use with numPartitions. Some datetime fields for instance they are stored as numerics – abiratsis Mar 14 '18 at 09:21
  • so the goal is to find a way to distribute your keys together with your data equally among your cluster. If you have such a field that satisfies that criteria then use numPartitions otherwise not. I hope I made it clear this time :) – abiratsis Mar 14 '18 at 13:39
  • Yes! It is clear now. I think I need to add an ID column to my DB because I have just 3 numeric columns and they are not well distributed. Many thanks Alexandros! – FrancoFranchi Mar 14 '18 at 14:01