I have a list of tuples (tuples_list). I want to perform a map operation on it, but part of the map operation requires information from a pretty big matrix (matrix). No writes to the matrix, just reads. This is a scipy compressed sparse row matrix (csr_matrix).
So the map function might look something like:
def map_function(list_element, matrix):
info = get_element_specific_info_from_matrix(list_element, matrix)
new_element = get_new_element(info)
return new_element
Here's a high-level summary of what my code's doing:
from pyspark import SparkContext
sc = SparkContext("local", "Process Name")
matrix = ...
tuples_list = ...
...
tuples_list = sc.parallelize(tuples_list)
results_list = tuples_list.map(lambda tup: map_function(tup, matrix))
results_list = results_list.collect() # error happens here
The problem is, I keep running into heap / memory issues, and I suspect it's because the Spark driver is making copies of that matrix to pass around to its workers:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readBroadcastFromFile.
: java.lang.OutOfMemoryError: Java heap space
I can't pass in a subset of that matrix, unfortunately. No assumptions can be made about what data the element needs from the matrix.
What I'd like to know:
- How can I verify whether or not copies are made of that matrix?
- If copies are made, how can I tell Spark to not make copies? (This job is done on a SLURM server / shared memory.)
- If Spark is not making copies, what're some steps I could take to diagnose the actual problem?
- Am I using the right terminology (driver, workers) in the right ways? If not please correct me.
Thanks!