1

In the code below, I am dynamically creating an object of the class inside the _py attribute by using the generate_object method.

The code works perfectly if I am not using a concurrent approach. However, if I use concurrency from concurrent.futures, I do not get the desired result because of an error saying (beyond other things):

_pickle.PicklingError: Can't pickle <class '__main__.Script_0_1'>: attribute lookup Script_0_1 on __main__ failed

After googling this error, I understood that only picklable objects are to be passed as parameter in ProcessPoolExecutor.map(), so I decided to see how I could turn my dynamic class to be picklable.

The problem is that all other solutions for this problem creates a dynamic object in a different manner (different from what I'm using in _string_to_object()). Examples: 1 and 2

I would very much like to keep the dynamic object creation the way it is right now because a lot of my real code is based on it, therefore I am looking for a concurrent solution that works with this toy code below.

Code

import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

class A:
    def __init__(self):
        self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
    
    def generate_text(self, name_1, name_2):
        py = self._py.format(name_1, name_2)
        py = codecs.decode(py, 'unicode_escape')
        return py

    def generate_object(self, number_1, number_2):
        """ Generate an object of the class inside the string self._py """

        return self._string_to_object(self.generate_text(number_1, number_2))

    def _string_to_object(self, str_class, *args, **kwargs):
        """ Transform a program written inside str_class to an object. """

        exec(str_class)
        class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
        return locals()[class_name](*args, **kwargs)

from functools import partial

print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()

print('Multiprocessing usage')
n_cores = 3
n_calls = 3

def concurrent_function(args):
    first_A = args[0]
    second_A = args[1]
    first_A.print_numbers()
    second_A.print_numbers()

with ProcessPoolExecutor(max_workers=n_cores) as executor:
    args = ( (A().generate_object(i, i+1), A().generate_object(i+1, i+2)) for i in range(n_calls))
    results = executor.map(concurrent_function, args)
ihavenoidea
  • 629
  • 1
  • 7
  • 26
  • 1
    Strictly speaking the problem is with multiprocessing and not `concurrent.futures` *per se*. If you replaced the `ProcessPoolExecutorClass` with the `ThreadPoolExecutor` class from `concurrent.futures` you would have no problem. If multiprocessing is really a requirement, I would suggest you remove the `concurrency` tag and add the `multiprocessing` tag. – Booboo Nov 07 '20 at 12:00
  • 1
    If you look carefully at the error message you will see `Can't pickle – Booboo Nov 07 '20 at 13:59
  • Thanks @Booboo, do you know how can I make internal classes visible to the global scope? Googling this does not return useful results unfortunately. – ihavenoidea Nov 07 '20 at 14:11
  • See my answer below. – Booboo Nov 07 '20 at 14:48

2 Answers2

1

I couldn't come up with a way of getting the Script classes to be created in the global name space strictly adhering to your current scheme. However:

Since for each invocation of method generate_object you are creating a new class in the local namespace and instantiating an object of that class, why not postpone that work for it to be done in the process pool? This also has the added advantage of doing this class-creation processing in parallel and there is no pickling required. We now pass to concurrent_function the two integer arguments number_1 and number_2:

import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor


class A:
    def __init__(self):
        self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''

    def generate_text(self, name_1, name_2):
        py = self._py.format(name_1, name_2)
        py = codecs.decode(py, 'unicode_escape')
        return py

    def generate_object(self, number_1, number_2):
        """ Generate an object of the class inside the string self._py """

        return self._string_to_object(self.generate_text(number_1, number_2))

    def _string_to_object(self, str_class, *args, **kwargs):
        """ Transform a program written inside str_class to an object. """

        exec(str_class)
        class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
        return locals()[class_name](*args, **kwargs)

"""
from functools import partial

print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()
"""


def concurrent_function(args):
    for arg in args:
        obj = A().generate_object(arg[0], arg[1])
        obj.print_numbers()

def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 3

    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
        # wait for completion of all tasks:
        results = list(executor.map(concurrent_function, args))

if __name__ == '__main__':
    main()

Prints:

Multiprocessing usage
Numbers =  0 and 1
Numbers =  1 and 2
Numbers =  1 and 2
Numbers =  2 and 3
Numbers =  2 and 3
Numbers =  3 and 4

A More Efficient Way

There is no need to use exec. Instead use closures:

from concurrent.futures import ProcessPoolExecutor

def make_print_function(number_1, number_2):
    def print_numbers():
        print(f'Numbers = {number_1} and {number_2}')

    return print_numbers



def concurrent_function(args):
    for arg in args:
        fn = make_print_function(arg[0], arg[1])
        fn()


def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 3

    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
        # wait for completion of all tasks:
        results = list(executor.map(concurrent_function, args))

if __name__ == '__main__':
    main()

Prints:

Multiprocessing usage
Numbers = 0 and 1
Numbers = 1 and 2
Numbers = 1 and 2
Numbers = 2 and 3
Numbers = 2 and 3
Numbers = 3 and 4

Using an Object Cache to Avoid Creating New Objects Unnecessarily

obj_cache = {} # each process will have its own

def concurrent_function(args):
    for arg in args:
        # was an object created with this set of arguments: (arg[0], arg[1])?
        obj = obj_cache.get(arg)
        if obj is None: # must create new object
            obj = A().generate_object(arg[0], arg[1])
            obj_cache[arg] = obj # save object for possible future use
        obj.print_numbers()
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • This definitely solves my problem, thank you! It is unfortunate that exec() is a pretty computionally expensive method and it kind of "defeates" the purpose of parallelizing the code. In my real code problem this solution overall speeds things up, but it would be much better if the exec() was to be executed only once. Hopefully someone will come up with a solution that fulfills that. If not, I will accept this answer (and award the bounty). Thanks again! – ihavenoidea Nov 07 '20 at 15:00
  • 1
    I have an important update that uses your original class as it was and simplifies the code a bit, – Booboo Nov 07 '20 at 15:10
  • 1
    And I have added a version that does not use `exec`, which would be much more efficient. But it's not clear for this particular example why either technique is required. I wonder what your actual real-world problem is. – Booboo Nov 07 '20 at 15:35
  • Thanks again for improving the answer! Unfortunately, specifically for my case using closures won't help. In my real code, I'm working with code synthesis and although the example here was made simple for clarity (only a print() function), the method `print_numbers()` inside the `A` class is synthesized and therefore I will not be able to hardcode it as you suggested. That's why I probably will still need the `exec()` function. But my case is a very specific one, your solution can definitely help others. – ihavenoidea Nov 07 '20 at 15:46
  • I see in your posted case that you are constructing your Script class instances with the same arguments. If this will be a possibility in the real-world case and the constructed objects are reusable, then caching the objects will be an efficiency. I have added code to demonstrate how you would do this for your posted case. – Booboo Nov 07 '20 at 16:17
0

Possibly I found a way to do this without the need of the exec() function. The implementation (with comments) is below.

import codecs
from concurrent.futures import ProcessPoolExecutor

class A:
    def __init__(self):
        self.py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
    def generate_text(self, number_1, number_2):
        py = self.py.format(number_1, number_2)
        py = codecs.decode(py, 'unicode_escape')
        return py

    def generate_object(self, number_1, number_2):
        class_code = self.generate_text(number_1, number_2)
        # Create file in disk
        with open("Script_" + str(number_1) + "_" + str(number_2) + ".py", "w") as file:
            file.write(class_code)
        # Now import it and the class will now be (correctly) seen in __main__
        package = "Script_" + str(number_1) + "_" + str(number_2)
        class_name = "Script_" + str(number_1) + "_" + str(number_2)
        # This is the programmatically version of 
        # from <package> import <class_name>
        class_name = getattr(__import__(package, fromlist=[class_name]), class_name)
        return class_name()

def concurrent_function(args):
    first_A = args[0]
    second_A = args[1]
    first_A.print_numbers()
    second_A.print_numbers()

def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 2
    
    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( (A().generate_object(i, i+1), A().generate_object(i+2, i+3)) for i in range(n_calls))
        results = executor.map(concurrent_function, args)

if __name__ == '__main__':
    main()

Basically what I'm doing is instead of dynamic allocating the class, I am writing it to a file. I'm doing this because the source of the problem I was having is that pickle was not able to correctly locate the nested class when looking at the global scope. Now I am programmatically importing the class (after saving it to file).

Of course, this solution also has the bottleneck of having to deal with files which is also costly. I did not measure whether dealing with files or exec is faster, but in my real-world case I need only one object of the synthesized class (and not one per parallel call as in the toy code provided), therefore the file option is best suited for me.

There's one problem yet: after using n_calls = 15 (for example) and executing many times, it seems like sometimes it is not able to import the module (the file just created). I tried to put a sleep() before programmatically importing it but it didn't help. This problem does not seem to happen when using a small number of calls and it also seems to happen randomly. An example of part of the error stack is shown below:

Traceback (most recent call last):
  File "main.py", line 45, in <module>
    main()
  File "main.py", line 42, in main
    results = executor.map(concurrent_function, args)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 674, in map
    results = super().map(partial(_process_chunk, fn),
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 600, in map
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 600, in <listcomp>
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 184, in _get_chunks
    chunk = tuple(itertools.islice(it, chunksize))
  File "main.py", line 41, in <genexpr>
    args = ( (A().generate_object(i, i+1), A().generate_object(i+2, i+3)) for i in range(n_calls))
  File "main.py", line 26, in generate_object
    class_name = getattr(__import__(package, fromlist=[class_name]), class_name)
ModuleNotFoundError: No module named 'Script_13_14'
Dharman
  • 30,962
  • 25
  • 85
  • 135
ihavenoidea
  • 629
  • 1
  • 7
  • 26