5

I'm trying to learn how to get a feel of what is going on inside Spark, and here's my current confusion. I'm trying to read first 200 rows from an Oracle table into Spark:

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> "schema.table",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

jdbcDF.limit(200).count()

This, I would expect, to be fairly quick. Similar action on a table with 500K rows completes in a reasonable time. In this particular case, the table is much bigger (hundreds of millions of rows), but limit(200) would, I'd think, make it fast? How do I go about figuring out where it spending its time?

eliasah
  • 39,588
  • 11
  • 124
  • 154
MK.
  • 33,605
  • 18
  • 74
  • 111

1 Answers1

12

As a matter of fact, spark isn't capable yet of pushing down the limit predicate.

So actually what's happening in this case scenario is that it's pulling all the data to spark and then limit and count. What you would need is to use it in a subquery as a table argument.

e.g :

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> "(select * from schema.table limit 200) as t",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

So mainly where it is spending time is pull all the data to spark.

You can also pass the limit dynamically in the subquery:

val n : Int = ???

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> s"(select * from schema.table limit $n) as t",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

There is a JIRA ticket (SPARK-10899) in progress to solve this issue but it's been hanging for almost a year.

EDIT: As the issue in the JIRA above was flagged as duplicate. You can continue on tracking the issue here - SPARK-12126. I hope that this answers your question.

eliasah
  • 39,588
  • 11
  • 124
  • 154
  • 1
    Thanks, it answers the substantial part (and it makes sense... how is Spark supposed to know how to limit results in a particular RDBMS?). – MK. Oct 03 '16 at 17:19
  • To be honest I haven't looked deeper into this topic but I know that it has to be implemented in the data source catalyst. But the Catalyst API is still mysterious and there is not much documentation around it. So I'm afraid I can't answer the question about how concretely the predicate should be push down. – eliasah Oct 03 '16 at 17:50
  • As of **Nov 2017**, I can confirm that `Spark 2.2.0` is now able to push down the `limit` *predicate* to `MySQL` – y2k-shubham Mar 01 '18 at 05:57
  • I achieve this by including `limit` *clause* in the `SQL` *query* (string) itself. Not sure if if works the way it is mentioned here. – y2k-shubham Mar 01 '18 at 06:08
  • 2
    It is not the same thing @y2k-shubham :) We are talking about the predicate that you define on spark's side. – eliasah Mar 01 '18 at 07:25