-1

I'm very new to pyspark/Apache Spark. I need to fetch multiple tables from a database on a server each containing around 120 million rows or more for analysis. I should be able to perform computations on the data. I am running pyspark on a server acting as both master and slave and has 7.45G of RAM. I have installed the jdbc driver and this is the code that I've used.

from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

hostname = "xx.xxx.xx.xx"
dbname = "AAA"
jdbcPort = 3306
username = "xxxxx"
password = "yyyyy"

jdbc_url = "jdbc:mysql://{}:{}/{}?user={}&password={}".format(hostname, jdbcPort, dbname, username, password)
query = "(SELECT * FROM SAMPLE_TABLE_NAME) alias_name"
df = sqlContext.read.format('jdbc').options(driver='com.mysql.jdbc.Driver', url=jdbc_url, dbtable=query).load()

The query loads fine but when I do df.show(), it displays the following:

[Stage 0:>                                                          (0 + 1) / 1]20/06/11 11:54:29 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.OutOfMemoryError: Java heap space
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
    at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
    at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
    at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
    at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
    at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2484)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
20/06/11 11:54:29 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 0,5,main]
java.lang.OutOfMemoryError: Java heap space
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
    at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
    at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
    at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
    at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
    at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2484)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
20/06/11 11:54:29 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
    at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
    at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
    at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
    at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
    at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2484)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

20/06/11 11:54:29 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 380, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaErrorERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
: <exception str() failed>

I also read that Spark uses a fraction of the available RAM for computation where the default for driver memory and executor memory is 1024MB and 512MB. So I launched pyspark from the terminal using this and implemented the same code shown above:

pyspark --jars /home/ubuntu/mysql-connector-java-5.1.45/mysql-connector-java-5.1.45-bin.jar --driver-memory 7G --executor-memory 7G

This gets rid of the java.lang.OutofMemorySpace: Java heap space error but shows other errors like - py4j.protocol.Py4JNetworkError: Answer from Java side is empty and IndexError: pop from an empty deque

Can someone please explain what's going on, what I'm doing wrong and what I need to do to fix this?

ouila
  • 45
  • 1
  • 9

1 Answers1

-1

You are trying to load in RAM too much data. You should first reduce the amount of data that SQL retrieves you before it reaches Spark and optimize it using spark parameters, for example partitions.

Consider one or more of these optimizations:

  • Specify in SELECT what columns to view explicitely, only the ones you need, if possible;
  • (Raw query) Loop in a while cycle until you can fetch rows by looping each row. Following techniques could work by setting a constant n_rows to read in memory and updating i index each cycling:

LIMIT i,i+n_rows

BETWEEN i AND i+n_rows

WHILE primaryKey >= i AND primaryKey < i+n_rows

partitionColumn select the column that will be used to determine how to split the data (For example, the primary key).

lowerBound estabilishes the minimum value of partitionColumn that will be fetched.

upperBound estabilishes the maximum value of partitionColumn that will be fetched.

numPartitions means how many parallel connection you want to set for reading data through RDBMS.

So Spark will retrieve you datasets using rows that you would get if you were doing SELECT * FROM table WHERE partitionColumn BETWEEN lowerBound AND upperBound.

Fabio Crispino
  • 711
  • 1
  • 6
  • 22