2

I have a pandas dataframe data_pandas which has about half a million rows and 30000 columns. I want this to be in a Spark dataframe data_spark and I achieve this by:

data_spark = sqlContext.createDataFrame(data_pandas)

I am working on an r3.8xlarge driver with 10 workers of the same configuration. But the aforementioned operation takes forever and returns an OOM error. Is there an alternate method I can try?

The source data in in HDF format, so I can't read it directly as a Spark dataframe.

j1897
  • 1,507
  • 5
  • 21
  • 41
  • If there is a conversion you might have to take a look at pyarrow https://arrow.apache.org/ – Bharath M Shetty Dec 11 '17 at 11:18
  • I have tried saving the pandas into parquet format using pyarrow, but that also gives me OOM error for this large dataset with 30,000+ columns – j1897 Dec 11 '17 at 11:29
  • Is your data partitioned? Or can you partition the data when storing in parquet? Also, you get OOM error in driver or executor? what does logs have ? Once you know where you have OOM you can tune memory requirement of driver/executor accordingly. – joshi.n Dec 11 '17 at 18:56

2 Answers2

1

You can try using arrow which can make it more efficient.

spark.conf.set("spark.sql.execution.arrow.enabled","true)

For more details refer: https://bryancutler.github.io/toPandas/

0

One way can be to read the data from the pandas dataframe in batches rather than at one go, one way would be to use the code below which divides it into 20 chunks (some part of the solution from the question here and here)

def unionAll(*dfs):
    ' by @zero323 from here: http://stackoverflow.com/a/33744540/42346 '
    first, *rest = dfs  # Python 3.x, for 2.x you'll have to unpack manually
    return first.sql_ctx.createDataFrame(
        first.sql_ctx._sc.union([df.rdd for df in dfs]),
        first.schema
    )

df_list = []
for chunk in np.array_split(df1,20):
    df_list.append(sqlContext.createDataFrame(chunk))

df_all = unionAll(df_list)
Gaurav Dhama
  • 1,346
  • 8
  • 19