8

I'm currently trying to bulk migrate the contents of a very large MySQL table into a parquet file via Spark SQL. But when doing so I quickly run out of memory, even when setting the driver's memory limit higher (I'm using spark in local mode). Example code:

Dataset<Row> ds = spark.read()
    .format("jdbc")
    .option("url", url)
    .option("driver", "com.mysql.jdbc.Driver")
    .option("dbtable", "bigdatatable")
    .option("user", "root")
    .option("password", "foobar")
    .load();

ds.write().mode(SaveMode.Append).parquet("data/bigdatatable");

It seems like Spark tries to read the entire table contents into memory, which isn't going to work out very well. So, what's the best approach to doing bulk data migration via Spark SQL?

Josh Stone
  • 4,328
  • 7
  • 29
  • 37
  • You are getting OOM not because spark is configured wrong, you probably should enable streaming in driver: http://stackoverflow.com/a/2448019/2439539 – r90t Nov 08 '16 at 21:19

1 Answers1

7

In your solution, Spark will read entire table contents into one partition before it starts writing. One way you can avoid that is partitioning the reading part, but it requires a numeric sequential column in the source data:

Dataset<Row> ds = spark.read()
  .format("jdbc")
  .option("url", url)
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "bigdatatable")
  .option("user", "root")
  .option("password", "foobar")
  .option("partitionColumn", "NUMERIC_COL")
  .option("lowerBound", "1")
  .option("upperBound", "10000")
  .option("numPartitions", "64")
  .load();

In the example above, the column "NUMERIC_COL" must exist in the data and it should, ideally, vary uniformly from 1 to 10000. Of course, this is a lot of requirements and a column like that will probably not exist, so you should probably create a view in the database with a column like that, or you add it in the query (note that I used a generic SQL syntax, you will have to adapt for your DBMS):

String query = "(select mod(row_number(), 64) as NUMERIC_COL, * from bigdatatable) as foo"

Dataset<Row> ds = spark.read()
  .format("jdbc")
  .option("url", url)
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", query)
  .option("user", "root")
  .option("password", "foobar")
  .option("partitionColumn", "NUMERIC_COL")
  .option("lowerBound", "0")
  .option("upperBound", "63")
  .option("numPartitions", "64")
  .load();
Daniel de Paula
  • 17,362
  • 9
  • 71
  • 72
  • Actually, this could be incorrect. for example, row_number() in oracle is the ordering in the resultset. Since you don't have an order by on your query, you cannot assume that the order on the query is the same on every query, even though it may appear that it is. What you need to be using is row_id, i.e. something fixed with the row that's unchanging, not rownum. – zzhu8192 May 27 '18 at 05:02
  • How to generalize this , if we have 100 tables we need to duplicate whole stuff for 100 tables 100 times ... is there any better way to use the connection pool kind of stuff – BdEngineer Oct 09 '18 at 04:56