I want to convert a very large pyspark dataframe into pandas in order to be able to split it into train/test pandas frames for the sklearns random forest regressor. Im working inside databricks with Spark 3.1.2.
The dataset has a shape of (782019, 4242).
When running the following command i run out of memory according to the stacktrace.
dataset_name = "dataset_path"
dataset = spark.read.table(dataset_name)
dataset_pd = dataset.toPandas()
22/01/31 08:06:32 WARN TaskSetManager: Lost task 2.2 in stage 16.0 (TID 85) (X.X.X.X executor 3): java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$19(Executor.scala:859)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:859)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
According to the reply here it is because of the toPandas implementation as it tries to write the dataset into one ByteArrayOutputStream which is only working with data below 2GB of size.
Is there any other way to get my dataframe converted into pandas?
Edit1: For further context, adding the rf regressor training
search_space={'n_estimators':hp.uniform('n_estimators',100,1000),
'max_depth':hp.uniform('max_depth',10,100),
'min_samples_leaf':hp.uniform('min_samples_leaf',1,50),
'min_samples_split':hp.uniform('min_samples_split',2,50)}
def train_model(params):
# Enable autologging on each worker
mlflow.autolog()
with mlflow.start_run(nested=True):
est=int(params['n_estimators'])
md=int(params['max_depth'])
msl=int(params['min_samples_leaf'])
mss=int(params['min_samples_split'])
model_hp = RandomForestRegressor(n_estimators=est,max_depth=md,min_samples_leaf=msl,min_samples_split=mss)
model_hp.fit(X_train, y_train)
pred=model_hp.predict(X_test)
mae= rmse=sklearn.metrics.mean_absolute_error(y_test,pred)
mse= rmse=sklearn.metrics.mean_squared_error(y_test,pred)
rmse=sklearn.metrics.mean_squared_error(y_test,pred, squared=False)
mlflow.log_metric('mae', mae)
mlflow.log_metric('mse', mse)
mlflow.log_metric('rmse', rmse)
return rmse
spark_trials = SparkTrials(
parallelism=8
)
with mlflow.start_run(run_name='rf') as run:
best_params = fmin(
fn=train_model,
space=search_space,
algo=tpe.suggest,
max_evals=128,
trials=spark_trials)