4

Spark allows you to read in parallel from a sql db source, and one can partition based on a sliding window, for example (from the book, chapter 7)

 val colName = "count"
 val lowerBound = 0L
 val upperBound = 348113L // this is the max count in our table
 val numPartitions = 10

 spark.read.jdbc(url,
                tablename,
                colName,
                lowerBound,
                upperBound,
                numPartitions,
                props).count()

Here, the upper bound is known before hand.

Lets say, a table gets 'x' number of rows(which can be between 1-2 million) in a day and at the end of the day we submit a spark job, do some transformations and write to a Parquet/CSV/JSON. If we don't know before hand about how many rows will be written (as it varying from 1-2 million) to the SQL source database, in such a scenario what will be the best approach or practise to do a partition.

One way is to either have an estimation of your upper bound, but I am not sure this is a right approach.

jdk2588
  • 782
  • 1
  • 9
  • 23

2 Answers2

2

I had exactly the same problem. Also i need a random distribution. So I choose a int column and get mod 10 on it. This way I can partition anything without caring bounds or distribution.

options += ("numPartitions" -> numPartitions,"partitionColumn"->"mod(my_int_column,10)","lowerBound"->"0","upperBound"->"9")
mcelikkaya
  • 315
  • 4
  • 13
  • This solution does not work anymore from Spark 2.4.0 because partitionColumn are now resolved to find out if they exist in the schema (https://github.com/apache/spark/commit/f596ebe4d3170590b6fce34c179e51ee80c965d3). As a replacement, you can use `predicates` parameter (https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/DataFrameReader.html#jdbc-java.lang.String-java.lang.String-java.lang.String:A-java.util.Properties-). Generate predicates like this: `Array.range(0, partitionCount).map(partNumber => s"mod(my_int_column, $partitionCount) = $partNumber")`. – JBENOIT Jan 25 '19 at 10:29
  • 2
    Regarding Spark 2.4, you can add this column as part of the query you are executing: `Select mod(id,10) as partition_key,* from table` and then user `partition_key` as the partition column – Alex Stanovsky Mar 12 '19 at 10:28
0

The fact that the number of rows is different each day doesn't really change much. Let's say you want to have 20 partitions, then one day you have roughly 1M/20 rows in a single partition and the other day roughly 2M/20. If there's more data and the number of partitions is fixed then obviously partitions will have more data in them.

In the case that there's confusion about lower and upper bound, I want to clear up that lowerBound and upperBound refer to the value of the column you want to partition on and not the number of rows. Also, the table is not filtered on these values meaning if you have rows with values smaller than lowerBound or larger than upperBound these rows will still be included.

See: Whats meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?

And from docs: http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

partitionColumn, lowerBound, upperBound: These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.

oh54
  • 478
  • 1
  • 3
  • 12
  • Yes, i do understand the partition is based on the value of the column(thats why column name is passed). But instead of doing sliding window partition, can spark do automatic partition for JDBC source (like it does for CSV/Parquet) ? – jdk2588 Jul 19 '17 at 01:57
  • 1
    You can just not specify the partitionColumn, lowerBound, upperBound, and numPartitions. Then spark will open a single jdbc connection to your sql server and partitions how it sees fit. So it's almost certainly a significantly worse performing solution (but easier on your sql server). In the docs link I gave there's snippets on querying through jdbc without specifying the partitionColumn and bounds. – oh54 Jul 19 '17 at 12:56
  • Without giving any bounds spark will create 1 partition(checked using df.rdd.partition.size) , which will not help because spark will not do any kind of partitioning by its own. – jdk2588 Jul 19 '17 at 13:02
  • @oh54 You mentioned that the values smaller than lowerBound or larger than upperBound these rows will still be included. But in the link you have given for any question, the answerer says that 'Without them you would loose some data (the data before the lowerBound and that after upperBound).' Which is actually true? – ds_user Feb 08 '18 at 01:00
  • 1
    @ds_user The answerer in that question writes that in response to the other answerer who didn't include all the queries that would actually be run. So the accepted answer in that question and me are saying the same thing - that you will not lose any data no matter what you choose for the bounds. – oh54 Feb 08 '18 at 21:14
  • No, the accepted answerer says you will loose the data beyond the upper bound. Thats why confusion here. – ds_user Feb 08 '18 at 21:25
  • 1
    He says "Actually the list above misses a couple of things, sepcificaly the first and the last query. Without them you would loose some data". But he does include the first and last query, therefore no data is lost. How would you lose data beyond upper bound if you have "SELECT * FROM table WHERE partitionColumn > 9000"? – oh54 Feb 08 '18 at 21:42