1

I want to convert a very large pyspark dataframe into pandas in order to be able to split it into train/test pandas frames for the sklearns random forest regressor. Im working inside databricks with Spark 3.1.2.

The dataset has a shape of (782019, 4242).

When running the following command i run out of memory according to the stacktrace.

dataset_name = "dataset_path"
dataset = spark.read.table(dataset_name)
dataset_pd = dataset.toPandas()

Spark UI executors summary

22/01/31 08:06:32 WARN TaskSetManager: Lost task 2.2 in stage 16.0 (TID 85) (X.X.X.X executor 3): java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$19(Executor.scala:859)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:859)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

According to the reply here it is because of the toPandas implementation as it tries to write the dataset into one ByteArrayOutputStream which is only working with data below 2GB of size.

Is there any other way to get my dataframe converted into pandas?

Edit1: For further context, adding the rf regressor training

search_space={'n_estimators':hp.uniform('n_estimators',100,1000),
           'max_depth':hp.uniform('max_depth',10,100),
           'min_samples_leaf':hp.uniform('min_samples_leaf',1,50),
           'min_samples_split':hp.uniform('min_samples_split',2,50)}
 
def train_model(params):
  # Enable autologging on each worker
  mlflow.autolog()
  with mlflow.start_run(nested=True):
    est=int(params['n_estimators'])
    md=int(params['max_depth'])
    msl=int(params['min_samples_leaf'])
    mss=int(params['min_samples_split'])
    
    
    model_hp = RandomForestRegressor(n_estimators=est,max_depth=md,min_samples_leaf=msl,min_samples_split=mss)
    model_hp.fit(X_train, y_train)
    pred=model_hp.predict(X_test)
    
    mae= rmse=sklearn.metrics.mean_absolute_error(y_test,pred)
    mse= rmse=sklearn.metrics.mean_squared_error(y_test,pred)
    rmse=sklearn.metrics.mean_squared_error(y_test,pred, squared=False)
    
    mlflow.log_metric('mae', mae)
    mlflow.log_metric('mse', mse)
    mlflow.log_metric('rmse', rmse)
    return rmse
  

spark_trials = SparkTrials(
  parallelism=8
)

 
with mlflow.start_run(run_name='rf') as run:
  best_params = fmin(
    fn=train_model, 
    space=search_space, 
    algo=tpe.suggest, 
    max_evals=128,
    trials=spark_trials)
Alex Ott
  • 80,552
  • 8
  • 87
  • 132
Giuseppe
  • 375
  • 1
  • 4
  • 13
  • 2
    Whats the rationale of reading into pyspark and converting into pandas without any transformation or pruning happening? – wwnde Jan 31 '22 at 08:16
  • 1
    *"it is because of the toPandas implementation as it tries to write the dataset into one ByteArrayOutputStream which is only working with data below 2GB of size"* => Not in this case. The length of your collection, 782019, is way under Java's max int value. Your data is just too big for your driver's memory. I guess this is the point where you decide to use Spark ML rather than scikit – ernest_k Jan 31 '22 at 08:22
  • @wwnde I added further code for more context. In the current implementation i can not use distributed objects. – Giuseppe Jan 31 '22 at 08:34
  • @ernest_k I tried to higher the driver memory but i saw no change. So according to you the solution would be to stick with Spark MLs RF Regressor and keep working with the pyspark dataframes. – Giuseppe Jan 31 '22 at 08:34
  • @ernest_k i switched to the spark ml implementation but i can not understand the performance difference of both algorithms. While the sklearn implementation need 1.82 minutes on a dataset of 800 samples, spark ml needs 47 minutes. Do you have any idea why this would be the case? – Giuseppe Feb 01 '22 at 09:35
  • @Giuseppe Isn't it because at the ML stage Spark is not only creating a model, but also executing all commands before that? In Spark, not much is being calculated unless needed and if ML is the first task that requires that, then everything is calculated at this stage. To run a sklearn model you already must have these calculations behind you (probably at the toPandas() stage) and this can be the source of such difference – Pengshe Feb 02 '22 at 16:45

2 Answers2

1

have you tried dask?

import dask.dataframe as dd 
data = dd.read_csv(...) # dask dataframe
df = data.compute() #this is pandas dataframe

Parallel Dask XGBoost Model Training with xgb.dask.train() By default, XGBoost trains your model sequentially. This is fine for basic projects, but as the size of your dataset and/or XGBoost model grows, you’ll want to consider running XGBoost in distributed mode with Dask to speed up computations and reduce the burden on your local machine.

You’ll know when you’ve hit the memory limits of your machine when you get the following error message:

xgboost.core.XGBoostError: out of memory

XGBoost comes with a native Dask integration that makes it possible to train multiple models in parallel. Running an XGBoost model with the distributed Dask backend only requires two changes to your regular XGBoost code:

substitute dtrain = xgb.DMatrix(X_train, y_train)
with dtrain = xgb.dask.DaskDMatrix(X_train, y_train)
substitute xgb.train(params, dtrain, ...)
with xgb.dask.train(client, params, dtrain, ...)

Take a look at the notebook if you want to know how to create the data_local subset.

from dask_ml.model_selection import train_test_split
 
# Create the train-test split
X, y = data_local.iloc[:, :-1], data_local["target"]
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, shuffle=True, random_state=2
)
Now you’re all set to train your XGBoost model.

Let’s use the default parameters for this example.

import xgboost as xgb
# Create the XGBoost DMatrices
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)
# train the model
output = xgb.dask.train(
    client, params, dtrain, num_boost_round=4,
    evals=[(dtrain, 'train')]
)
You can then use your trained model together with your testing split to make predictions.

# make predictions
y_pred = xgb.dask.predict(client, output, dtest)

Credit: https://coiled.io/blog/dask-xgboost-python-example/

dimz
  • 179
  • 8
1

Converting such DataFrame to Pandas will fail, because this function requires all the data to be loaded into the driver's memory, which will run out at some point. Probably the best way here would be to switch to PySpark's implemention of RandomForestRegressor here.

If you just need to see how the data generally behaves, you could try breaking down your DataFrame into smaller chunks, converting them separately and connecting the data. But it's really not recommended as it wouldn't be a good long term solution. First problem is related to finding the optimal number of chunks that you would need to break down the data. Secondly, depending on task and usage, sometimes the driver might run of out memory sooner and then your task would crash again. This is especially true if your dataset will expand in the future.

Pengshe
  • 335
  • 3
  • 10