I have a use case where I want to iteratively load data into Pandas dataframes, do some processing using outside functions (i.e. xgboost, not shown in the example code), and then push the result into a single PySpark object (RDD or DF).
I've tried to get PySpark to spill to disk when storing data either as an RDD or Dataframe, again where the source is a Pandas DataFrame. Nothing seems to be working, I keep crashing the Java driver and I can't load my data in. Alternatively, I've tried loading my data without processing just using a basic textFile RDD and it worked like a charm. I'm wondering if this is a PySpark bug or else if there is a workaround.
Sample Code:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
import pyspark
try:
SparkContext.stop(sc)
except NameError:
1
SparkContext.setSystemProperty('spark.executor.memory', '200g')
SparkContext.setSystemProperty('spark.driver.memory', '200g')
sc = SparkContext("local", "App Name")
sql_sc = SQLContext(sc)
chunk_100k = pd.read_csv("CData.csv", chunksize=100000)
empty_df = pd.read_csv("CData.csv", nrows=0)
infer_df = pd.read_csv("CData.csv", nrows=10).fillna('')
my_schema = SQLContext.createDataFrame(sql_sc, infer_df).schema
SparkDF = SQLContext.createDataFrame(sql_sc, empty_df, schema=my_schema)
for chunk in chunk_100k:
SparkDF = SparkDF.union(SQLContext.createDataFrame(sql_sc, chunk, schema=my_schema))
Crashes after a few iterations with:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space
Working direct load to RDD code:
my_rdd = sc.textFile("CData.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1]))
Update:
I have changed the code to demonstrate failure when loading into Spark DataFrames instead of RDDs, note that the issue still persists and the error message is still referencing RDDs. Previous to changing the example code, saving to RDDs was found to be at least problematic when using 'parallelize' for the following reasons: