0

I have a class defined and used as follows, that includes a method called in the context of a multiprocessing pool:

from multiprocessing import Pool

class MyClass:
    def __init__(self, big_object):
        self.big_object = big_object

    def compute_one(self, myvar):
        return self.big_object.do_something_with(myvar)

    def compute_all(self, myvars):
        with Pool() as pool:
            results = pool.map(compute_one, myvars)
        return results


big_object = pd.read_csv('very_big_file.csv')
cls = MyClass(big_object)

vars = ['foo', 'bar', 'baz']
all_results = cls.compute_all(vars)

This "works" but the issue is that big_object takes several Gbs in RAM, and with the above code, this big object is copied in RAM for every process launched by multiprocessing.Pool.

How to modify the above code so that big_object would be shared among all processes, but still able to be defined at class instantiation?

-- EDIT

Some context into what I'm trying to do might help identify a different approach altogether.

Here, big_object is a 1M+ rows Pandas dataframe, with tens of columns. And compute_one computes statistics based on a particular column.

A simplified high-level view would be (extremely summarized):

  • take one of the columns (e.g. current_col = manufacturer)
  • for each rem_col of the remaining categorical columns:
    • compute a big_object.groupby([current_col, rem_col]).size()

The end result of this would look like:

manufacturer country_us country_gb client_male client_female
bmw 15 18 74 13
mercedes 2 24 12 17
jaguar 48 102 23 22

So in essence, this is about computing statistics for every column, on every other column, over the entire source dataframe.

Using multiprocessing here allows, for a given current_col, to compute all the statistics on remaining_cols in parallel. Among these statistics are not only sum but also mean (for remaining numerical columns).

A dirty approach using a global variable instead (a global big_object instantiated from outside the class), takes the entire running time from 5+ hours to about 20 minutes. My only issue is that I'd like to avoid this global object approach.

Jivan
  • 21,522
  • 15
  • 80
  • 131
  • Could you use threads instead of processes? Shared variables is the native way to share data between a bunch of threads, not so for processes. – timgeb May 09 '22 at 10:30
  • 1
    @timgeb `compute_one` is CPU-bond so unless I'm mistaken I think I need processes here, as the point is to use all available cores – Jivan May 09 '22 at 10:33
  • Sharing a big object between processes just isn't a good idea. You could share it using shared memory (in theory – no idea whether pandas would grok that) but would have to synchronise access to it (aka lock) – which defeats the purpose of multiprocessing when the bulk of your application is not concurrently useable. – MisterMiyagi May 09 '22 at 10:34
  • 1
    [this looks promising, no?](https://stackoverflow.com/questions/17377426/shared-variable-in-pythons-multiprocessing) – timgeb May 09 '22 at 10:34
  • @MisterMiyagi the big object is not modified in any way by `compute_one`, or anywhere in the class – Jivan May 09 '22 at 10:35
  • @Jivan That doesn't mean it works without locking in practice. – MisterMiyagi May 09 '22 at 10:35
  • 2
    Processes can share memory via the [`multiprocessing.shared_memory.SharedMemory`](https://docs.python.org/3/library/multiprocessing.shared_memory.html#multiprocessing.shared_memory.SharedMemory) class. You can also define custom [`multiprocessing.managers`](https://docs.python.org/3/library/multiprocessing.html#customized-managers) that will act as a proxy to some (possibly big) object of a custom class (that runs in a separate thread). – martineau May 09 '22 at 10:36
  • @mistermiyagi it should supposedly be fine regarding locks as the whole dataset is stored in memory. The problem is even more basic, you can't share custom classes in shared memory afaik – Bharel May 09 '22 at 10:38
  • 2
    Few GB of data dangerously approaches a limit of RAM capabilities anyway. I would suggest rewriting the algorithm into "chunk by chunk" approach. – freakish May 09 '22 at 10:41
  • @mart using shared manager won't solve the problem. Every time he accesses it, pandas will return a new object and allocate a new space in his process. I'm not sure how views will work under shared memory. – Bharel May 09 '22 at 10:46
  • @Bharel: There are ways to share a single object among two or more processes via a custom `multiprocessing.manager` instance. – martineau May 09 '22 at 10:51
  • @martineau A `multiprocessing.manager` must use synchronisation *and* communication, so it's effectively the worst of both worlds. It's unlikely to be suitable for big, monolithic data blobs like a several GB pandas dataframe. – MisterMiyagi May 09 '22 at 10:55
  • @freakish not on a 328Gb RAM machine – Jivan May 09 '22 at 11:02
  • @freakish the point of `compute_one` is to compute statistics over the entire dataframe — I'm not sure how to achieve it on a chunk by chunk basis – Jivan May 09 '22 at 11:03
  • @Jivan what exactly you compute? Note that doing calculations over a disk, with arbitrarily limited usage of RAM, is what every database engine is capable of. And yes, 328Gb RAM is a lot. So either don't worry about it, or refactor your algorithm. – freakish May 09 '22 at 11:07
  • @freakish I added an edit to the original post to clarify the context. – Jivan May 09 '22 at 11:12
  • @Jivan hmm, this doesn't look too complicated. You read the input chunk by chunk (for example by 10000 rows), you calculate your groupby for the chunk (and all possible combinations of columns) and then incrementally add this temporary result to the current state (empty at the begining). Am I missing something? Although I don't know how easy this is with pandas (I have 0 experience with it). – freakish May 09 '22 at 11:22
  • Yes, a managed object must use communication because the method invocation is akin to a remote procedure call and introduces some extra overhead. But in this case `compute_one` is being called only once per column and the overhead of that remote method call is insignificant compared to the processing done by `compute_one`. So this should be a none-issue. And I don't see where a managed object would require synchronization above and beyond what a non-managed version of the object would. – Booboo May 09 '22 at 17:30

1 Answers1

1

One solution is to make MyClass a managed class just like, for example, a managed dictionary created with multiprocessor.Manager().dict(). To ensure that there is only one copy of the "big object", first I would modify MyClass.__init__ to take a CSV file path argument. In this way the "big object" is constructed only in the process of the manager. Second, I would remove the compute_all logic from MyClass and invoke the multiprocessing.pool.Pool.map method in such as way that what is being passed as the worker function is the managed objects's proxy.

What you save in space, you give up in some performance since each invocation of method compute_one results in more-or-less the equivalent of a remote method call to the manager's process where it is actually executed.

from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import pandas as pd

class MyClassManager(BaseManager):
    pass

class MyClass:
    def __init__(self, path):
        self.big_object = pd.read_csv(path)

    def compute_one(self, myvar):
        # For demo purposes just check if column myvar exists in dataframe:
        return myvar in self.big_object

# Required for Windows:
if __name__ == '__main__':
    MyClassManager.register('MyClass', MyClass)
    with MyClassManager() as manager:
        cls = manager.MyClass('very_big_file.csv')

        # vars is a built-in function name and should not be used:
        my_vars = ['foo', 'bar', 'baz']
        with Pool() as pool:
            all_results = pool.map(cls.compute_one, my_vars)
        print(all_results)
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • A comment was just incorrectly added [as answer](https://stackoverflow.com/a/75947594/5211833): "I have found that based @Booboo 's answer, there is only one process is runing in actually, right?" – Adriaan Apr 06 '23 at 08:56