0

I have a use case where I want to iteratively load data into Pandas dataframes, do some processing using outside functions (i.e. xgboost, not shown in the example code), and then push the result into a single PySpark object (RDD or DF).

I've tried to get PySpark to spill to disk when storing data either as an RDD or Dataframe, again where the source is a Pandas DataFrame. Nothing seems to be working, I keep crashing the Java driver and I can't load my data in. Alternatively, I've tried loading my data without processing just using a basic textFile RDD and it worked like a charm. I'm wondering if this is a PySpark bug or else if there is a workaround.

Sample Code:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
import pyspark

try:
    SparkContext.stop(sc)
except NameError:
    1

SparkContext.setSystemProperty('spark.executor.memory', '200g')
SparkContext.setSystemProperty('spark.driver.memory', '200g')
sc = SparkContext("local", "App Name")
sql_sc = SQLContext(sc)

chunk_100k = pd.read_csv("CData.csv", chunksize=100000)
empty_df = pd.read_csv("CData.csv", nrows=0)
infer_df = pd.read_csv("CData.csv", nrows=10).fillna('')
my_schema = SQLContext.createDataFrame(sql_sc, infer_df).schema

SparkDF = SQLContext.createDataFrame(sql_sc, empty_df, schema=my_schema)

for chunk in chunk_100k:
    SparkDF = SparkDF.union(SQLContext.createDataFrame(sql_sc, chunk, schema=my_schema))

Crashes after a few iterations with:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space

Working direct load to RDD code:

my_rdd = sc.textFile("CData.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1]))

Update:

I have changed the code to demonstrate failure when loading into Spark DataFrames instead of RDDs, note that the issue still persists and the error message is still referencing RDDs. Previous to changing the example code, saving to RDDs was found to be at least problematic when using 'parallelize' for the following reasons:

Why does SparkContext.parallelize use memory of the driver?

abby sobh
  • 1,574
  • 19
  • 15
  • @zero323 I have changed to context to disambiguate from the issue when using parallelize. – abby sobh Sep 26 '17 at 20:15
  • 2
    It is still __exactly the same problem__ (`SparkSession.createDataFrame` -> `SparkSession._createFromLocal` -> `SparkContext.parallelize`) and the same reason for failure. Creating distributed data structures from local objects is just not the way to go. If you want to load data in a scalable way, use Spark csv reader. – zero323 Sep 27 '17 at 19:26
  • Is there no other way than using the Spark csv reader? I want to read from pandas while spilling to disk, not csv. It is an extra step to write the file from pandas to disk and then re-load it into Spark. – abby sobh Oct 12 '17 at 22:03

1 Answers1

-1

create a spark-defaults.conf file in file in apache-spark/1.5.1/libexec/conf/ and add the following line to it: spark.driver.memory 45G spark.driver.maxResultSize 10G

M.achaibou
  • 91
  • 3
  • 15