2

Is there a way to limit the number of records fetched from the jdbc source using spark sql 2.2.0?

I am dealing with a task of moving (and transforming) a large number of records >200M from one MS Sql Server table to another:

val spark = SparkSession
    .builder()
    .appName("co.smith.copydata")
    .getOrCreate()

val sourceData = spark
    .read
    .format("jdbc")
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .option("url", jdbcSqlConnStr)
    .option("dbtable", sourceTableName)
    .load()
    .take(limit)

While it works, it is clearly first loading all the 200M records from the database first taking its sweet 18 min and then returns me the limited number of records I desire for testing and development purposes.

Switching around take(...) and load() produces compilation error.

I appreciate there are ways to copy sample data to a smaller table, use SSIS, or alternative etl tools.

I am really curious whether there is a way to achieve my goal using spark, sql and jdbc.

vkhazin
  • 136
  • 1
  • 12

3 Answers3

2

To limit the number of downloaded rows, a SQL query can be used instead of the table name in "dbtable". Description in documentation.

In query "where" condition can be specified, for example, with server specific features to limit the number of rows (like "rownum" in Oracle).

Nick Chammas
  • 11,843
  • 8
  • 56
  • 115
pasha701
  • 6,831
  • 1
  • 15
  • 22
  • I have tried two approaches using sql statement instead of the table name and got an error 'invalid syntax near select...'. And I have tried to select top 100 records using val sqlDF = spark.sql("...") approach with an error 'invalide spark sql syntax' or something like that. – vkhazin Oct 29 '17 at 18:59
  • for queries parentheses with alias have to be used, please look: https://stackoverflow.com/questions/43174838/how-to-use-a-subquery-for-dbtable-option-in-jdbc-data-source – pasha701 Oct 29 '17 at 21:09
  • Excellent! The following works! //Source data val jdbcSourceConnection = s"jdbc:sqlserver://$sourceDbHost;databaseName=$sourceDbName;user=$sourceDbUsername;password=$sourceDbPassword;" val sourceData = spark .read .format("jdbc") .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") .option("url", jdbcSourceConnection) // .option("dbtable", sourceTableName) .option("dbtable", "(select top 100 * from Customer.Preference) as table1") .load .sort("CustomerID") – vkhazin Oct 29 '17 at 21:39
  • While this works, you can accomplish similar using limit - see my answer. – Justin Pihony Nov 01 '17 at 03:09
1

This approach is a little bit bad for relational databases. The load function of spark will request your full table, store in memory/disk and then will do the RDD transformations and executions.

If you want to do an exploratory work, I will suggest you to store this data in your first load. There a few ways to do that. Take your code and do like this:

val sourceData = spark
    .read
    .format("jdbc")
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .option("url", jdbcSqlConnStr)
    .option("dbtable", sourceTableName)
    .load()
sourceData.write
    .option("header", "true")
    .option("delimiter", ",")
    .format("csv")
    .save("your_path")

This will allow you to save your data in your local machine as CSV, the most common format that you can work with any language for exploration. Everytime that you want to load this, take this data from this file. If you want real time analysis, or any other thing like this. I will suggest you build a pipeline with the transformations of the data to update another storage. Using this approach to process your data of loading from your db every time is not good.

Thiago Baldim
  • 7,362
  • 3
  • 29
  • 51
1

I have not tested this, but you should try using limit instead of take. take calls head under the covers which has the following note:

this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.

whereas limit results in a LIMIT pushed into the sql query as it is a lazy evaluation:

The difference between this function and head is that head is an action and returns an array (by triggering query execution) while limit returns a new Dataset.

If you want the data without pulling it in first then you could even do something like:

...load.limit(limitNum).take(limitNum)
Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
  • I have used that exact approach with Cassandra as a data source, but for Sql Server jdbc it did not work for me. Maybe I should look further into it, thank you for the suggestion! – vkhazin Nov 02 '17 at 13:41
  • Nope: it is taking its sweet long time to load 100 records I have asked for. It appears that spark loads all the records first and then applies the limit in this case. ``` val sourceData = spark .read .format("jdbc") .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") .option("url", jdbcSourceConnection) // .option("dbtable", s"(select top ${config.sourceLimit} * from ${config.sourceTableName}) as table1") .option("dbtable", config.sourceTableName) .load .limit(config.sourceLimit.toInt) .sort("CustomerID") ``` – vkhazin Nov 02 '17 at 14:12
  • For me also it does not work as it takes the same amount of time to load one record or the entire rows within the table. – Nikunj Kakadiya Apr 11 '22 at 09:20