0

I have a list of tuples (tuples_list). I want to perform a map operation on it, but part of the map operation requires information from a pretty big matrix (matrix). No writes to the matrix, just reads. This is a scipy compressed sparse row matrix (csr_matrix).

So the map function might look something like:

def map_function(list_element, matrix):
    info = get_element_specific_info_from_matrix(list_element, matrix)
    new_element = get_new_element(info)
    return new_element

Here's a high-level summary of what my code's doing:

from pyspark import SparkContext

sc = SparkContext("local", "Process Name")
matrix = ...
tuples_list = ...

...

tuples_list = sc.parallelize(tuples_list)
results_list = tuples_list.map(lambda tup: map_function(tup, matrix)) 
results_list = results_list.collect() # error happens here

The problem is, I keep running into heap / memory issues, and I suspect it's because the Spark driver is making copies of that matrix to pass around to its workers:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readBroadcastFromFile.
: java.lang.OutOfMemoryError: Java heap space

I can't pass in a subset of that matrix, unfortunately. No assumptions can be made about what data the element needs from the matrix.

What I'd like to know:

  • How can I verify whether or not copies are made of that matrix?
  • If copies are made, how can I tell Spark to not make copies? (This job is done on a SLURM server / shared memory.)
  • If Spark is not making copies, what're some steps I could take to diagnose the actual problem?
  • Am I using the right terminology (driver, workers) in the right ways? If not please correct me.

Thanks!

  • 1
    I don't know if it would help to know exactly what is `matrix`? I'm tempted to guess it is a `numpy` `ndarray`, but it could be a Python list, `pandas` dataframe, some sort of image or what ever. – hpaulj Feb 06 '19 at 20:57
  • It's a scipy compressed sparse row matrix (csr_matrix). Updated my post. – Joshua Mitchell Feb 07 '19 at 17:35
  • The main memory footprint of a `csr` is in the 3 attribute arrays, `data`, `indices`, `indptr`. `sparse.save_npz` writes such a matrix to a `zip` archive with multiplye `.npy` files. I won't even try to imagine what's involved in Serializing such an object in a Java framework. – hpaulj Feb 07 '19 at 17:45

1 Answers1

0

In short you cannot. But to answer your questions step by step

How can I verify whether or not copies are made of that matrix?

There are in fact multiple copies, in both serialized and deserialized form. Since you use PySpark serialized versions exist, at some point, on both JVM (that's where your code fails) and Python side.

If copies are made, how can I tell Spark to not make copies?

You cannot. Spark is a distributed processing system and its design choice don't make slightest sense on shared memory system. In particular there is indirection caused by host - guest approach, and inherent duplication caused by supervisor - worker architecture. Finally there is further isolation introduced in PySpark, where each worker uses its own process.

Wait, that's not all - local mode is a testing tool and not a production ready engine (not to mention that local is not even parallel).

There are some small windows that can reduce duplication - distribute data through file system and use memory mapped data structures, but really, just choose a tool which is right for the job, and can fully leverage the resources (especially correcting for non-uniform memory access). Spark is not one of these.