9

I am new to Spark, and am working on creating a DataFrame from a Postgres database table via JDBC, using spark.read.jdbc.

I am a bit confused about the partitioning options, in particular partitionColumn, lowerBound, upperBound, and numPartitions.


  • The documentation seems to indicate that these fields are optional. What happens if I don't provide them?
  • How does Spark know how to partition the queries? How efficient will that be?
  • If I DO specify these options, how do I ensure that the partition sizes are roughly even if the partitionColumn is not evenly distributed?

Let's say I'm going to have 20 executors, so I set my numPartitions to 20.
My partitionColumn is an auto-incremented ID field, and let's say the values range from 1 to 2,000,000
However, because the user selects to process some really old data, along with some really new data, with nothing in the middle, most of the data has ID values either under 100,000 or over 1,900,000.

  • Will my 1st and 20th executors get most of the work, while the other 18 executors sit there mostly idle?

  • If so, is there a way to prevent this?

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
JoeMjr2
  • 3,804
  • 4
  • 34
  • 62

2 Answers2

5

I found a way to manually specify the partition boundaries, by using the jdbc constructor with the predicates parameter.

It allows you to explicitly specify individual conditions to be inserted in the "where" clause for each partition, which allows you to specify exactly which range of rows each partition will receive. So, if you don't have a uniformly distributed column to auto-partition on, you can customize your own partition strategy.

An example of how to use it can be found in the accepted answer to this question.

JoeMjr2
  • 3,804
  • 4
  • 34
  • 62
  • what is the guarantee that `int_id < 500000", "int_id >= 500000 && int_id < 1000000` will work. consider example of continuos additions and deletions where int_id may be end up with records around 1000 which is only first predicate... so we need to select min and max of records and we have to pass as predicates.. pls see my update which will work in that case. – Ram Ghadiyaram Jul 22 '19 at 20:52
  • @JoeMjr2 Does numPartitions apply to nested queries as well? – TheTank Jun 09 '20 at 14:17
2

What are all these options : spark.read.jdbc refers to reading a table from RDBMS.

parallelism is power of spark, in order to achieve this you have to mention all these options.

Question[s] :-)

1) The documentation seems to indicate that these fields are optional. What happens if I don't provide them ?

Answer : default Parallelism or poor parallelism

Based on scenario developer has to take care about the performance tuning strategy. and to ensure data splits across the boundaries (aka partitions) which in turn will be tasks in parallel. By seeing this way.

2) How does Spark know how to partition the queries? How efficient will that be?

jdbc-reads -referring to databricks docs

You can provide split boundaries based on the dataset’s column values.

  • These options specify the parallelism on read.
  • These options must all be specified if any of them is specified.

Note

These options specify the parallelism of the table read. lowerBound and upperBound decide the partition stride, but do not filter the rows in table. Therefore, Spark partitions and returns all rows in the table.

Example 1:
You can split the table read across executors on the emp_no column using the partitionColumn, lowerBound, upperBound, and numPartitions parameters.

val df = spark.read.jdbc(url=jdbcUrl,
    table="employees",
    columnName="emp_no",
    lowerBound=1L,
    upperBound=100000L,
    numPartitions=100,
    connectionProperties=connectionProperties)

also numPartitions means number of parllel connections you are asking RDBMS to read the data. if you are providing numPartitions then you are limiting number of connections... with out exhausing the connections at RDBMS side..

Example 2 source : datastax presentation to load oracle data in cassandra :

val basePartitionedOracleData = sqlContext
.read
.format("jdbc")
.options(
Map[String, String](
"url" -> "jdbc:oracle:thin:username/password@//hostname:port/oracle_svc",
"dbtable" -> "ExampleTable",
"lowerBound" -> "1",
"upperBound" -> "10000",
"numPartitions" -> "10",
"partitionColumn" -> “KeyColumn"
)
)
.load()

The last four arguments in that map are there for the purpose of getting a partitioned dataset. If you pass any of them, you have to pass all of them.

When you pass these additional arguments in, here’s what it does:

It builds a SQL statement template in the format

SELECT * FROM {tableName} WHERE {partitionColumn} >= ? AND
{partitionColumn} < ?

It sends {numPartitions} statements to the DB engine. If you suppled these values: {dbTable=ExampleTable, lowerBound=1, upperBound=10,000, numPartitions=10, partitionColumn=KeyColumn}, it would create these ten statements:

SELECT * FROM ExampleTable WHERE KeyColumn >= 1 AND KeyColumn < 1001
SELECT * FROM ExampleTable WHERE KeyColumn >= 1001 AND KeyColumn < 2000
SELECT * FROM ExampleTable WHERE KeyColumn >= 2001 AND KeyColumn < 3000
SELECT * FROM ExampleTable WHERE KeyColumn >= 3001 AND KeyColumn < 4000
SELECT * FROM ExampleTable WHERE KeyColumn >= 4001 AND KeyColumn < 5000
SELECT * FROM ExampleTable WHERE KeyColumn >= 5001 AND KeyColumn < 6000
SELECT * FROM ExampleTable WHERE KeyColumn >= 6001 AND KeyColumn < 7000
SELECT * FROM ExampleTable WHERE KeyColumn >= 7001 AND KeyColumn < 8000
SELECT * FROM ExampleTable WHERE KeyColumn >= 8001 AND KeyColumn < 9000
SELECT * FROM ExampleTable WHERE KeyColumn >= 9001 AND KeyColumn < 10000
And then it would put the results of each of those queries in its own partition in Spark.

Question[s] :-)

If I DO specify these options, how do I ensure that the partition sizes are roughly even if the partitionColumn is not evenly distributed?

Will my 1st and 20th executors get most of the work, while the other 18 executors sit there mostly idle?

If so, is there a way to prevent this?


All questions has one answer

Below is the way... 1) You need to understand how many number of records/rows per partition.... based on this you can repartition or coalesce

Snippet 1: Spark 1.6 >
spark 2.x provides facility to know how many records are there in the partition.

spark_partition_id() exists in org.apache.spark.sql.functions

import org.apache.spark.sql.functions._ 
val df = "<your dataframe read through rdbms.... using spark.read.jdbc>"
df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count.show

Snippet 2 : for all version of spark

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","NumberOfRecordsPerPartition")
  .show

and then you need to incorporate your strategy again query tuning between ranges or repartitioning etc.... , you can use mappartitions or foreachpartitions

Conclusion : I prefer using given options which works on number columns since I have seen it was dividing data in to uniform across bounderies/partitions.

Some time it may not be possible to use these option then manually tuning the partitions/parllelism is required...


Update :

With the below we can achive uniform distribution...

  1. Fetch the primary key of the table.
  2. Find the key minimum and maximum values.
  3. Execute Spark with those values.

def main(args: Array[String]){
// parsing input parameters ...
val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5)
val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}")
    val min = result.getString(1).toInt
    val max = result.getString(2).toInt
    val numPartitions = (max - min) / 5000 + 1
val spark = SparkSession.builder().appName("Spark reading jdbc").getOrCreate()
var df = spark.read.format("jdbc").
option("url", s"${url}${config("schema")}").
option("driver", "com.mysql.jdbc.Driver").
option("lowerBound", min).
option("upperBound", max).
option("numPartitions", numPartitions).
option("partitionColumn", primaryKey).
option("dbtable", config("table")).
option("user", user).
option("password", password).load()
// some data manipulations here ...
df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath)      
}
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • Thank you. I knew most of this, but I could use more detail about how to do the query tuning / repartitioning. That is, once I have the output from df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count.show, what exactly do I do with it? – JoeMjr2 Jun 11 '19 at 13:25
  • - Query tunining scan between [ranges](http://www.russellspitzer.com/2017/06/09/Dealing-With-Large-Partitions/) if its string column.for example if you have string column called 10 categories... then you can scan each category like I showed in example 2. - Repartitioning : In general repartitioning can be done no executors * cores * replication factor. for example you have 20 executors * 4 cores * 2-3 = 160-240 partitons you may go with. to understand whether partitioning has roughly equal number of records.. you can check with above code snippets is what I am saying. – Ram Ghadiyaram Jun 11 '19 at 14:41