3

I have a oracle table which has n number of records, now i want to load the data from that table with a where/filter condition to spark dataframe. I Do not want to load complete data to a dataframe and then apply filter on it. Is there any option in spark.read.format("jdbc")...etc or any other solution?

CUTLER
  • 61
  • 1
  • 5
  • https://stackoverflow.com/questions/46984914/spark-scala-jdbc-how-to-limit-number-of-records/46993171#46993171 – pasha701 Apr 23 '20 at 12:20
  • Does this answer your question? [spark, scala & jdbc - how to limit number of records](https://stackoverflow.com/questions/46984914/spark-scala-jdbc-how-to-limit-number-of-records) – UninformedUser Apr 23 '20 at 12:43

4 Answers4

1

Check below code. You can write your own query inside query variable. To process or load data parallel you can check for partitionColumn, lowerBound & upperBound columns.

val query = """
  (select columnA,columnB from table_name
    where <where conditions>) table
"""  
val options = Map(
    "url"              -> "<url>".
    "driver"           -> "<driver class>".
    "user"             -> "<user>".
    "password"         -> "<password>".
    "dbtable"          -> query,
    "partitionColumn"  -> "",
    "lowerBound"       -> "<lower bound values>", 
    "upperBound"       -> "<upper bound values>"
)

val df = spark
        .read
        .format("jdbc")
        .options(options)
        .load()

Srinivas
  • 8,957
  • 2
  • 12
  • 26
0

try this

val sourceDf = spark.read.format("jdbc").option("driver", driver).option("url", url).option("dbtable", "(select * from dbo.employee c where c.joindate  > '2018-11-19 00:00:00.000') as subq").option("numPartitions", 6).option("partitionColumn", "depId").option("lowerBound", 1).option("upperBound", 100).option("user", user).option("password", pass).load()

it will enable where condition along with partitions

jwvh
  • 50,871
  • 7
  • 38
  • 64
0

Spark does support predicate pushdown for JDBC source.

You can simply load the dataframe using spark.read.format("jdbc") and run filter using .where() on top of that df, you can then check spark SQL predicate pushdown being applied.

In SparkSQL you can see the exact query that ran against the db and you will find the WHERE clause being added.

So you don't need to add anything extra for it.

For more details, refer to this article by databricks https://docs.databricks.com/data/data-sources/sql-databases.html#push-down-optimization

Rahul Kumar
  • 2,184
  • 3
  • 24
  • 46
0

You can use following options for this use case . Refer link

    jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

Creating the dataframe based on the query condition :

pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)