3

We are currently exploring how we can process a big amount of data store in a Google Cloud SQL database (MySQL) using Apache Beam/Google Dataflow.

The database stores about 200GB of data in a single table.

We successfully read rows from the database using JdbcIO, but so far this was only possible if we LIMIT the number of rows queried. Otherwise we will run into memory issue. I assume by default a SELECT query tries to load all resulting rows in-memory.

What is the idiomatic approach for this? Batching the SQL queries? Streaming the results?

We tried adjusting the fetch size of the statement executed, without much success.

This is what our JDBC read setup looks like:

JdbcReadOptions(
  connectionOptions = connOpts,
  query = "SELECT data FROM raw_data",
  statementPreparator = statement => statement.setFetchSize(100),
  rowMapper = result => result.getString(1)
)

I haven't found any resources regarding stream from sql so far.

EDIT

I'm gonna list a view approaches I took, so others can learn something (for example how not to do it). To have a bit more context, the database table in question is really badly structured: It has a column containing a JSON string, and id column (primary key) plus a added and modified column (both TIMESTAMP types). At the time of the first approach it had no further indices. The table contains 25 mio rows. So this is probably more an database issue rather than a Apache Beam/JDBC issue. But nevertheless:

Approach 1 (above) - Query everything

Basically it looked like this:

val readOptions = JdbcReadOptions(
  connectionOptions = connOpts,
  query = "SELECT data FROM raw_data",
  rowMapper = result => result.getString(1)
)

context
  .jdbcSelect(readOptions)
  .map(/*...*/)

This worked if I added a LIMIT to the query. But obviously was very slow.

Approach 2 - Keyset pagination

val queries = List(
  "SELECT data from raw_data LIMIT 5000 OFFSET 0",
  "SELECT data from raw_data LIMIT 5000 OFFSET 5000",
  "SELECT data from raw_data LIMIT 5000 OFFSET 10000"
  // ...
)

context
  .parallelize(queries)
  .map(query => {
      val connection = DriverManager.getConnection(/* */)
      val statement = connection.prepareStatement(query)
      val result = statement.executeQuery()

      makeIterable(result) // <-- creates a Iterator[String]
  })
  .flatten
  .map(/* processing */)

This worked somewhat better, though I quickly learned that a LIMIT _ OFFSET _ combination also starts scanning from the first row. So each subsequent query took longer, converging to way to long times.

Approach 2.5 - Keyset pagination with ordering

Like the above approach, but we created an index on the added column and updated the query to

SELECT data FROM raw_data ORDER BY added LIMIT 5000 OFFSET x

This sped things up, but eventually the query time grew to long.

Approach 3 - No Beam/Dataflow

val connection = DriverManager.getConnection(/* */)
val statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(Integer.MIN_VALUE)

val rs = statement.executeQuery("SELECT data FROM raw_data")

while(rs.next()) {
  writer writeLine rs.getString(1)
}

This streams the resultset back row-by-row and writes the rows into files. It ran about 2 hours for all 25 mio records. Finally. It would be great if someone could point out how this solution can be achieved with Beam.

BTW: Now that I have the raw data as CSV files processing with Beam is a breeze. It's about 80GB of raw data which can be transformed to another CSV format in about 5 min with autoscaling etc.

Scarysize
  • 4,131
  • 25
  • 37
  • 1
    Oh I see. Pagination using LIMIT/OFFSET will not give you any parallelism because databases typically can't give you the 10th page of results without scanning and discarding the first 9 pages, so basically your queries are scanning the entire database N times. You need to partition it by *values* of the primary key, e.g. "select data from raw_data where added between X and Y", where the total range of [X, Y) pairs you supply covers all possible values of "added". – jkff Feb 21 '18 at 16:28
  • Also it seems you're not using JdbcIO.readAll(). I suppose scio doesn't explicitly support it, but it doesn't prevent you from using it either, right? (you can apply arbitrary Beam transforms to PCollections, not just those that scio has a wrapper for) – jkff Feb 21 '18 at 16:29
  • I probably could use readAll() with scio. I'm new to both scio and Beam/Dataflow, that's why I'm still not fully aware of the possibilities. Though looking back, I found the solution which worked in the end (w/o Beam) was the most obvious and elegant one for my use case. Thanks for all the hints & explanations though! – Scarysize Feb 22 '18 at 08:01

2 Answers2

2

It seems that the MySQL JDBC driver requires some special measures to make it not load the entire result set into memory; e.g. I was able to find this code solving the problem in a different project. JdbcIO will need to do the same, or at least be configurable enough to let a user do it. I filed issue https://issues.apache.org/jira/browse/BEAM-3714.

Meanwhile, as a workaround, you can use JdbcIO.readAll() to partition your query into many smaller queries, e.g. you might partition it by a range of IDs. Note that no transactional consistency will be enforced between them - they will be independent queries as far as MySQL is concerned.

jkff
  • 17,623
  • 5
  • 53
  • 85
  • Thanks for the hint. I will check if this solves the issue! – Scarysize Feb 16 '18 at 08:47
  • Sadly the db is pretty badly structured. Even with keyset pagination the queries got very slow. – Scarysize Feb 20 '18 at 11:08
  • Could you tell more, e.g. show some snippets of your code and tell a bit about how the performance of that compared to dumping CSV? Your experience would be very valuable for future users of JdbcIO. – jkff Feb 20 '18 at 15:18
  • I will edit my original question with some more details! – Scarysize Feb 20 '18 at 16:25
  • Thanks. What you did is not sufficient - all the other steps in BEAM-3714 are also needed (specifying result set type etc). Alternatively, have you tried the suggestion to use readAll() and partition your query so that different ranges of the primary key can be queried in parallel and so that each range contains not too much data? (seems like no, since you ended up exporting via CSV - but just curious) – jkff Feb 20 '18 at 19:23
  • Updated my question. Don't sue me for the screwed up database table ;) – Scarysize Feb 21 '18 at 09:24
  • https://issues.apache.org/jira/browse/BEAM-3714 has been fixed in Beam master and will be available in the next release. – jkff Apr 20 '18 at 21:19
  • Nice! Thanks for the heads up. – Scarysize Apr 21 '18 at 08:01
1

I think JDBCIO doesn't scale very well due to its inherent limitations (single SELECT). I'm not aware of streaming support coming from MySQL and BEAM.

You can probably dump your DB to something easier for data processing systems to process (e.g., csv). Does it work for you?

Zhou Yunqing
  • 444
  • 2
  • 3
  • 1
    JdbcIO is not limited to a single select: you can use readAll() that executes multiple queries, taking parameters from a PCollection. – jkff Feb 16 '18 at 04:18
  • I now dumped the DB to CSV, this is much more convenient to process. – Scarysize Feb 20 '18 at 11:07