1

I read a lot about memory usage of Spark when doing stuff like collect() or toPandas() (like here). The common wisdom is to use it only on small dataset. The point is how small Spark can handle?

I run locally (for testing) with pyspark, the driver memory set to 20g (I have 32g on my 16 cores mac), but toPandas() crashes even with a dataset as small as 20K rows! That cannot be right, so I suspect I do some (setting) wrong. This is the simplified code to reproduce the error:

import pandas as pd
import numpy as np
import pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# setting the number of rows for the CSV file
N = 20000
ncols = 7
c_name = 'ABCDEFGHIJKLMNOPQRSTUVXYWZ'

# creating a pandas dataframe (df) 
df = pd.DataFrame(np.random.randint(999,999999,size=(N, ncols)), columns=list(c_name[:ncols]))

file_name = 'random.csv'

# export the dataframe to csv using comma delimiting
df.to_csv(file_name, index=False)

## Load the csv in spark
df = spark.read.format('csv').option('header', 'true').load(file_name)#.limit(5000)#.coalesce(2)

## some checks
n_parts = df.rdd.getNumPartitions()
print('Number of partitions:', n_parts)
print('Number of rows:', df.count())

## conver spark df -> toPandas
df_p = df.toPandas()
print('With pandas:',len(df_p))

I run this within jupyter, and get errors like:

ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to /192.168.0.104:61536
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
...

My spark local setting is (everything else default):

('spark.driver.host', '192.168.0.104')
('spark.driver.memory', '20g')
('spark.rdd.compress', 'True')
('spark.serializer.objectStreamReset', '100')
('spark.master', 'local[*]')
('spark.executor.id', 'driver')
('spark.submit.deployMode', 'client')
('spark.app.id', 'local-1618499935279')
('spark.driver.port', '55115')
('spark.ui.showConsoleProgress', 'true')
('spark.app.name', 'pyspark-shell')
('spark.driver.maxResultSize', '4g')

Is my setup wrong, or it is expected that even 20g of driver memory can't handle a small dataframe with 20K rows and 7 columns? Will repartitioning help?

scipio1465
  • 23
  • 5
  • try running the same code on local pyspark shell – Vignesh D Apr 15 '21 at 16:54
  • Same thing (error) happens if the scrip is run on shell, with `spark-submit`, still with just 20K rows – scipio1465 Apr 16 '21 at 14:11
  • The error seems purely due to connection problems or network config issue.Maybe you are adding configs(like ports and host) that create this error.By default config local mode usually works flawlessly. Try running with just 5 or 10 rows, to ignore this possibility – Vignesh D Apr 19 '21 at 14:08
  • @VigneshD: I'm not sure it is just connection. If I reduce the sample to 18K (instead of 20K) it runs successfully. The script is very simple and I would like to know if other people can reproduce the problem. – scipio1465 Apr 20 '21 at 18:26

0 Answers0