1

I am running a spark analytics application and reading MSSQL Server table (whole table) directly using spark jdbc. Thes table have more than 30M records but don't have any primary key column or integer column. Since the table don't have such column I cannot use the partitionColumn, hence it is taking too much time while reading the table.

val datasource = spark.read.format("jdbc")
                .option("url", "jdbc:sqlserver://host:1433;database=mydb")
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
                .option("dbtable", "dbo.table")
                .option("user", "myuser")
                .option("password", "password")
                .option("useSSL", "false").load()

Is there any way to improve the performance is such case and use the parallelism while reading data from relational database sources ( The source could be Oracle, MSSQL Server, MySQL, DB2).

Sandeep Singh
  • 7,790
  • 4
  • 43
  • 68

1 Answers1

1

The only way is to write a query that return data partitioned and specify the partitionColumn into the new column generated, but I don't know if this can be really a speedup for your ingestion.

For example in a pseudo sql code:

val myReadQuery = SELECT *,(rowid %5) as part from table

And after

val datasource = spark.read.format("jdbc")
                .option("url", "jdbc:sqlserver://host:1433;database=mydb")
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
                .option("dbtable", s"($myReadQuery) as t")
                .option("user", "myuser")
                .option("password", "password")
                .option("useSSL", "false").
                .option("numPartitions", 5)
                .option("partitionColumn", "part")
                .option("lowerBound", 1)
                .option("upperBound", 5).load()

But how I already say I am no really sure that this can improve your ingestion. Because this cause a 5 parallel query like this:

SELECT * from (select *, (rowid%5) as part from table) where part >= 0 and part < 1
SELECT * from (select *, (rowid%5) as part from table) where part >= 1 and part < 2
SELECT * from (select *, (rowid%5) as part from table) where part >= 2 and part < 3
SELECT * from (select *, (rowid%5) as part from table) where part >= 3 and part < 4
SELECT * from (select *, (rowid%5) as part from table) where part >= 4 and part < 5 

But I think that if in your table there are an index you can use the index to extract an integer that with the mod operation can split the read operation and in the same time can speedup the read query.

gccodec
  • 343
  • 1
  • 8
  • I believe, `partitionColumn` should be either primary key column or integer column. Could you please explain about the usage of `numPartitions`, `numPartitions` `lowerBound` and `upperBound` in my case? – Sandeep Singh Sep 26 '19 at 08:50
  • yes. PartitionColumn must be integer type but you can create an integer field that can works as a partitioner. numPartitions are the number of the read partition, lowerBound and upperBound are used to specify the max and minimum value of the partitionColumn so spark, internally can build the query as i wrote before and run the query on parallel without duplicates – gccodec Sep 26 '19 at 08:53