2

I'm having an issue with instances not retaining changes to attributes, or even keeping new attributes that are created. I think I've narrowed it down to the fact that my script takes advantage of multiprocessing, and I'm thinking that changes occurring to instances in separate process threads are not 'remembered' when the script returns to the main thread.

Basically, I have several sets of data which I need to process in parallel. The data is stored as an attribute, and is altered via several methods in the class. At the conclusion of processing, I'm hoping to return to the main thread and concatenate the data from each of the object instances. However, as described above, when I try to access the instance attribute with the data after the parallel processing bit is done, there's nothing there. It's as if any changes enacted during the multiprocessing bit are 'forgotten'.

Is there an obvious solution to fix this? Or do I need to rebuild my code to instead return the processed data rather than just altering/storing it as an instance attribute? I guess an alternative solution would be to serialize the data, and then re-read it in when necessary, rather than just keeping it in memory.

Something maybe worth noting here is that I am using the pathos module rather than python's multiprocessingmodule. I was getting some errors pertaining to pickling, similar to here: Python multiprocessing PicklingError: Can't pickle <type 'function'>. My code is broken across several modules and as mentioned, the data processing methods are contained within a class.

Sorry for the wall of text.

EDIT Here's my code:

import importlib
import pandas as pd
from pathos.helpers import mp
from provider import Provider

# list of data providers ... length is arbitrary
operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']


# create provider objects for each operating provider
provider_obj_list = []
for name in operating_providers:
    loc     = 'providers.%s' % name
    module  = importlib.import_module(loc)
    provider_obj = Provider(module)
    provider_obj_list.append(provider_obj)

processes = []
for instance in provider_obj_list:
    process = mp.Process(target = instance.data_processing_func)
    process.daemon = True
    process.start()
    processes.append(process)

for process in processes:
    process.join()

# now that data_processing_func is complete for each set of data, 
# stack all the data
stack = pd.concat((instance.data for instance in provider_obj_list))

I have a number of modules (their names listed in operating_providers) that contain attributes specific to their data source. These modules are iteratively imported and passed to new instances of the Provider class, which I created in a separate module (provider). I append each Provider instance to a list (provider_obj_list), and then iteratively create separate processes which call the instance method instance.data_processing_func. This function does some data processing (with each instance accessing completely different data files), and creates new instance attributes along the way, which I need to access when the parallel processing is complete.

I tried using multithreading instead, rather than multiprocessing -- in this case, my instance attributes persisted, which is what I want. However, I am not sure why this happens -- I'll have to study the differences between threading vs. multiprocessing.

Thanks for any help!

Le Chase
  • 170
  • 9
  • 3
    In Python, multiprocessing creates subprocesses which run in different memory spaces, but threads within a process all execute in the same memory-space. Sharing data between processes involved "pickling" and sending it from one to another (and unpickling it there). Threads don't require this, but do need to control concurrent access to the shared data to prevent corruption issues. You haven't posted any code in your question, which makes if very difficult for anyone to give you a more concrete answer. – martineau Mar 19 '19 at 01:06
  • 1
    Thanks @martineau! I edited my answer with my code. I also got the script to work using `threading`. I'll look into the differences between the two. – Le Chase Mar 19 '19 at 19:55
  • 1
    OK, here's a more concrete answer. I think you're getting the pickling error because you're trying to pass `Provider` instances to the subprocesses. A workaround would be to define a target function that accepted only a single `loc` argument, which the function could then use to load the needed module, create a `Provider` instance from it, and then used that to call its `data_processing_func()`. – martineau Mar 19 '19 at 21:08
  • 1
    I'm the `pathos` (and `multiprocess`) author. I think @martineau has a good approach. A thread pool definitely should work. You could also look into refactoring so you can use a shared memory Array (from `multiprocess`/`multiprocessing`)... but this is probably leads to something more complicated than @martineau's answer. – Mike McKerns Mar 20 '19 at 02:01
  • 1
    After reading @Mike McKerns' comment, it dawned on me I completely glossed-over the issue of getting data _back_ from the subprocesses—partially because you're not doing anything obvious in the regard in the sample code you added to your question. There are several possibilities that I know of, depending on the type of data involved. As to whether this would be a good candidate for multi-threading, that depends on what kind "data processing" is going on. Unless it's something i/o-bound, multiprocessing would likely be faster. – martineau Mar 20 '19 at 02:36
  • @MikeMcKerns thanks for your input. @martineau thanks for responding again. Apologies about the code -- I edited once more showing what I intend to do after the threads are complete. Essentially, `data_processing_func` is creates a new attribute, `data`, which is a pandas DataFrame. After each set of data has completed processing (in parallel), I'm basically just looking to stack them all together. My current code works with `threading`, and I believe the process is i/o bound -- I'm accessing data repeatedly on remote drives and my cpu is plenty powerful (this may be a naive assumption) – Le Chase Mar 20 '19 at 15:39
  • 1
    Based on the information added in your latest edit, multithreading _does_ indeed sound like a better approach, and as @Mike already mentioned, you should definitely look into using a thread pool instead of creating each thread explicitly (if that's what you're doing in your multithreaded version of the code). Thanks for accepting my answer below anyway. `;¬)` – martineau Mar 20 '19 at 16:49

1 Answers1

2

Here's some sample code showing how to do what I outlined in comment. I can't test it because I don't have provider or pathos installed, but it should give you a good idea of what I suggested.

import importlib
from pathos.helpers import mp
from provider import Provider

def process_data(loc):
    module  = importlib.import_module(loc)
    provider_obj = Provider(module)
    provider_obj.data_processing_func()


if __name__ == '__main__':

    # list of data providers ... length is arbitrary
    operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']

    # create list of provider locations for each operating provider
    provider_loc_list = []
    for name in operating_providers:
        loc = 'providers.%s' % name
        provider_loc_list.append(loc)

    processes = []
    for loc in provider_loc_list:
        process = mp.Process(target=process_data, args=(loc,))
        process.daemon = True
        process.start()
        processes.append(process)

    for process in processes:
        process.join()
martineau
  • 119,623
  • 25
  • 170
  • 301