7

I'd like to tell in advance that several related questions, like the following, DO NOT address my problems:

This one comes close but the stack-trace is different and it is unresolved anyways. So rest assured that I'm posting this question after several days of (failed) solution-hunting.


I'm trying to write a job that moves data (once a day) from MySQL tables to Hive tables stored as Parquet / ORC files on Amazon S3. Some of the tables are quite big: ~ 300M records with 200 GB+ size (as reported by phpMyAdmin).

Currently we are using sqoop for this but we want to move to Spark for the following reasons:

  • To leverage it's capabilities with DataFrame API (in future, we would be performing transformations while moving data)
  • We already have a sizeable framework written in Scala for Spark jobs used elsewhere in the organization

I've been able to achieve this on small MySQL tables without any issue. But the Spark job (that reads data from MySQL into DataFrame) fails if I try to fetch more than ~1.5-2M records at a time. I've shown the relevant portions of stack-trace below, you can find the complete stack-trace here.

...
javax.servlet.ServletException: java.util.NoSuchElementException: None.get
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
...
Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
...
org.apache.spark.status.api.v1.OneStageResource.taskSummary(OneStageResource.scala:62)
    at sun.reflect.GeneratedMethodAccessor188.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
...
[Stage 27:>                                                       (0 + 30) / 32]18/03/01 01:29:09 WARN TaskSetManager: Lost task 3.0 in stage 27.0 (TID 92, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal, executor 6): java.sql.SQLException: Incorrect key file for table '/rdsdbdata/tmp/#sql_14ae_5.MYI'; try to repair it
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
...

** This stack-trace was obtained upon failure of moving a 148 GB table containing 186M records

As apparent from (full) stack-trace, the Spark read job starts sulking with the false warnings of None.get error followed by SQLException: Incorrect key for file.. (which is related to MySQL's tmp table becoming full)


Now clearly this can't be a MySQL problem because in that case sqoop should fail as well. As far as Spark is concerned, I'm parallelizing the read operation by setting numPartitions = 32 (we use parallelism of 40 with sqoop).

From my limited knowledge of Spark and BigData, 148 GB shouldn't be overwhelming for Spark by any measure. Moreover since MySQL, Spark (EMR) and S3 all reside in same region (AWS AP-SouthEast), so latency shouldn't be the bottleneck.


My questions are:

  1. Is Spark a suitable tool for this?
  2. Could Spark's Jdbc driver be blamed for this issue?
  3. If answer to above question is
    • Yes: How can I overcome it? (alternate driver, or some other workaround)?
    • No: What could be the possible cause?

Framework Configurations:

  • Hadoop distribution: Amazon 2.8.3
  • Spark 2.2.1
  • Hive 2.3.2
  • Scala 2.11.11

EMR Configurations:

  • EMR 5.12.0
  • 1 Master: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:32 GiB]
  • 1 Task: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:none]
  • 1 Core: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:32 GiB]

** These are the configurations of development cluster; production cluster would be better equipped

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • 2
    Hi there, a couple of question 1) why dont you use sqoop to save into hdfs and then read that file with spark to later insert it on Hive? 2) What are the resources allocated for this spark process? – Felix Mar 07 '18 at 11:40
  • 1
    I agree with Felix. Sqoop is a tool that is dedicated to moving RDBMS data to HDFS. I've had a few problems with Spark JDBC myself. Alternatively, you can also use the Sqoop Java API if you really insist on doing this through a Scala Application. – philantrovert Mar 07 '18 at 11:58
  • In all cases, since you are using EMR, rather then doing that in two times. Use Sqoop to dump data to parquet, whether you want/need S3 that up to you, Than pull data using Spark and performs your transformations. – eliasah Mar 07 '18 at 12:51
  • As per the jdbc source for the matter, MapReduce lays on the ability of partitioning data. You don't study that part, you'll not be able to use Spark with JDBC properly. For more info, you can read this : https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md#reading-data-using-jdbc-source (disclaimer : I'm one ofthe authors of the document.) – eliasah Mar 07 '18 at 12:55
  • **@Felix**, **@philantrovert** we are presently using `sqoop` to dump data to `HDFS` and then use `distcp` to put in on `S3` in `parquet` format (2nd step also requires `MSCK repair`). But this incurs a lot of redundant `Disk I/O` and impacts other jobs that rely on *same* `HDFS`. In fact, `sqoop` and `HDFS` have given us so many *lemons* in past 18 months that we plan to stop using them completely. We got motivation to use `Spark` as data migration tool from [here](https://www.percona.com/blog/2016/08/17/apache-spark-makes-slow-mysql-queries-10x-faster/) – y2k-shubham Mar 20 '18 at 07:21
  • **@eliasah** the thing is that `sqoop 1.4.7` is obsolete and has had issues with writing `parquet` / `orc` files in the past. That's why we were forced to employ the *2-step approach* described in above comment. On the other hand, `sqoop 1.99.7` is still not properly *documented* and *production-ready*. I've already gone through your **excellent doc** and implemented *relevant-bits* from there; but the problem still persists. – y2k-shubham Mar 20 '18 at 07:28
  • Delving deeper into the *logs*, I've discovered that despite setting `numPartitions=16` (or 32), `Spark` is only hitting `MySQL` with (max) 3 queries at a time. As a result, even when the task succeeds (because of small `limit`), it is quite slow: **~20 min** for copying **1 M** rows (when `DataFrame` is stored with `partitionBy` on an `Int` column). From what I can see in *logs*, `stage`s take a while to *start* and make *progress* very slowly. – y2k-shubham Mar 20 '18 at 07:34
  • Interestingly, only `Scala` code is *plagued* with this problem; doing the same thing in `PySpark` shows a different problem altogether. While **`stage`s do not make any progress** in `Scala`-`Spark` (and eventually fail), they advance to completion in `PySpark`. However the last stage in `PySpark` fails with `OutOfMemory` error (possibly because of unequal splits among partitions as `MySQL` table has **holes** in *range* of it's `AUTO_INCREMENT` column) – y2k-shubham Mar 20 '18 at 07:42
  • And contrary to `Scala`-`Spark`, `PySpark` was also able to *parallelize* the *reads* to the same *degree* as specified in the `API` (read) method being invoked. – y2k-shubham Mar 20 '18 at 07:52
  • **@eliasah**, **@Felix**, **@philantrovert**, could *size of partitions*, i.e., amount of data in *(bytes)* in each `partition` of `DataFrame` be the culprit here? As per [this link](https://www.slideshare.net/hadooparchbook/top-5-mistakes-when-writing-spark-applications-66374492/39), each `partition` should *optimally* have **~ 128MB** of data. *[for some reason i feel this is too low]*. While setting `numPartitions=16` for reading `MySQL` tables, the resulting `partitions` in my case would easily reach **~ 10 GB**. But then again, too high value of `numPartitions` would *overwhelm* `MySQL` – y2k-shubham Mar 23 '18 at 10:28
  • I'd like to inform that I've managed to overcome the *parallelism* problem described in [above comments](https://stackoverflow.com/questions/49149996/spark-reading-big-mysql-table-into-dataframe-fails#comment85757791_49149996). So now `Spark` is able to hit `MySQL` with 16 (or whatever the value of `numPartitions` is) queries simultaneously – y2k-shubham Mar 23 '18 at 10:35
  • @y2k-shubham, whats the solution for this question?? I am using pyspark and downloading data to FileSystem currently and yes I face OOM issues towards the end – Viv Dec 05 '18 at 11:52
  • **@Viv** see [this](https://stackoverflow.com/questions/47918606/spark-gc-overhead-limit-exceeded-error-message/47919314#comment90700945_47919314) comment. Are you sure you are NOT passing a SQL query such as `(SELECT col_a, col_b, col_c FROM my_db.my_table) AS ql` in the `table` argument of [`DataFrameReader.jdbc(..)` method](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,columnName:String,lowerBound:Long,upperBound:Long,numPartitions:Int,connectionProperties:java.util.Properties):org.apache.spark.sql.DataFrame)? – y2k-shubham Dec 05 '18 at 12:05
  • For the record, [this](https://stackoverflow.com/questions/54002002/) looks closely related – y2k-shubham Jan 02 '19 at 07:17

1 Answers1

2

Spark JDBC API seem to fork to load all data from MySQL table to memory without. So when you try to load a big table, what you should do is use Spark API clone data to HDFS first (JSON should be used to keep schema structure), like this:

spark.read.jdbc(jdbcUrl, tableName, prop)
       .write()
       .json("/fileName.json");

Then you can working on HDFS instead normally.

spark.read().json("/fileName.json")
       .createOrReplaceTempView(tableName);