10

I currently have an application which is supposed to connect to different types of databases, run a specific query on that database using Spark's JDBC options and then write the resultant DataFrame to HDFS.

The performance was extremely bad for Oracle (didn't check for all of them). Turns out it was because of the fetchSize property which is 10 rows by default for Oracle. So I increased it to 1000 and the performance gain was quite visible. Then, I changed it to 10000 but then some of the tables started failing with an out of memory issue in the executor ( 6 executors, 4G memory each, 2G driver memory ).

My questions are :

  • Is the data fetched by Spark's JDBC persisted in executor memory for each run? Is there any way to un-persist it while the job is running?

  • Where can I get more information about the fetchSize property? I'm guessing it won't be supported by all JDBC drivers.

  • Are there any other things that I need to take care which are related to JDBC to avoid OOM errors?

philantrovert
  • 9,904
  • 3
  • 37
  • 61

2 Answers2

1

Fetch Size It's just a value for JDBC PreparedStatement.

You can see it in JDBCRDD.scala:

 stmt.setFetchSize(options.fetchSize)

You can read more about JDBC FetchSize here

One thing you can also improve is to set all 4 parameters, that will cause parallelization of reading. See more here. Then your reading can be splitted into many machines, so memory usage for every of them may be smaller.

For details which JDBC Options are supported and how, you must search for your Driver documentation - every driver may have it's own behaviour

T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • Heh, ok :D One thing I don't like in this source is that we must depend on 3rd party drivers - all of them works differently. But who doesn't use RDBMS? ;) – T. Gawęda Sep 15 '17 at 17:12
  • Reads are messy (parallelization, possibly inconsistent, unless you lock a whole database) writes can leave you in a inconsistent state when there is a partial failure. I feel like `COPY TO` - `COPY FROM` is almost always a better solution. – zero323 Sep 15 '17 at 17:16
  • @zero323 I agree. When I had to communicate with JDBC, I had two jobs: one for JDBC -> Parquet, then the second one with analytics. Simple, fast, etc. Then again we can write to JDBC (Streaming writing seems to be promising - in case of failure you've got batchId and you can easily relaunch it) – T. Gawęda Sep 15 '17 at 17:19
  • 1
    Thank you. I believe that answers my question. – philantrovert Sep 15 '17 at 18:19
  • 1
    **@philantrovert** how did you pass down this `fetchSize` property to `Spark`'s `DataFrameReader`? I tried passing it in `connectionProperties` param of `spark.read.jdbc(..)` method but it doesn't seem to have any impact on performance as mentioned [here](https://stackoverflow.com/questions/45589632). Also what is the exact key for the property: `fetchsize` or `fetchSize` or `fetch_size`? I'm using `MySQL` and `Spark 2.2.0` – y2k-shubham Apr 11 '18 at 12:38
  • @y2k-shubham You can do it with Spark option `fetchsize` – T. Gawęda Apr 11 '18 at 12:46
  • 1
    So do I pass it inside `connectionProperties` param for `DataFrameReader.jdbc(..)` method or through `DataFrameReader.option(..)` method? And does it work for `MySQL` when I'm using the standard `Connector/J` driver? – y2k-shubham Apr 11 '18 at 12:50
1

To answer @y2k-shubham's follow up question "do I pass it inside connectionProperties param", per the current docs the answer is "Yes", but note the lower-cased 's'.

fetchsize The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.

Ion Freeman
  • 512
  • 4
  • 19