2

Here's a minimal example of what I'm trying to parallelize

import numpy as np

def generate_function(a):
    def func(x):
        '''a complex function that uses several modules'''
        return x + np.sqrt(a)
    return func

if __name__ == '__main__':
    f = generate_function(0.5)
    x = np.arange(0, 100)
    y = np.array(list(map(f, x))) # want to parallelize this step

with multiprocessing, the nested func causes problems, since pickle can't access nested functions

import multiprocessing as mp
...
pool = mp.Pool(2)
y = np.array(pool.map(f, x))

AttributeError: Can't pickle local object 'generate_function.<locals>.func'

even with pathos, the modules are not imported

import pathos
...
pool = pathos.multiprocessing.ProcessPool(2)
y = np.array(pool.map(f, x))

NameError: name 'np' is not defined

Note that none of the other solutions on Python multiprocessing PicklingError: Can't pickle <type 'function'> work either

What's the best way to parallelize this?


So it is possible to get pathos to work by reimporting inside of generate_function

def generate_function(a):
    import numpy as np
    def func(x):
        '''a complex function that uses several modules'''
        return x + np.sqrt(a)
    return func

but I may have several imports with multiple generate_functions and multiple layers of nesting, and it will quickly get quite cumbersome keeping track of all that, so I would like to avoid this mess

def generate_function1(a):
    import module1, module2, module3
    from module4 import a, b
    from module5 import c as d
    from module6 import e as f
    def func(x):
        ...
    return func

def generate_function2(a):
    import module1, module2, module3
    from module4 import a, b
    from module5 import c as d
    from module6 import e as f
    def func(x):
        ...
    return func

def generate_generator_function(a):
    import module1, module2, module3
    from module4 import a, b
    from module5 import c as d
    from module6 import e as f
    def generate_function(a):
        import module1, module2, module3
        from module4 import a, b
        from module5 import c as d
        from module6 import e as f
        def func(x):
            ...
        return func
    return generate_function
petezurich
  • 9,280
  • 9
  • 43
  • 57
goweon
  • 1,111
  • 10
  • 19
  • Have you considered using classes? – kpie Mar 19 '22 at 19:54
  • @kpie I'm not sure how that would work when the `func`s must be generated dynamically – goweon Mar 19 '22 at 20:12
  • Do you have to use `np.arange` or can you use a simple `range`? – Schottky Mar 19 '22 at 20:51
  • Try to explicitly import np inside the def-s executed inside the remote pathos-processes. Pathos, as Mike McKearns noted here, can use pass-by-SER/DES but where-feasible can pass-by-SOURCE ( instead of pickle-dill SER/DES ) and there we are, your code can work fine in main, not so in remote, pathos-spawned, independent process, whereas the def-ed source-code can import ALAP the numpy module therein J.I.T. ( give it a try ... ) – user3666197 Mar 19 '22 at 22:01

2 Answers2

0

This won't solve your pickle problems but here's my thinking with classes to manage your imports.

>>> class funcFactory:
...     import numpy as np
...     def __init__(self):
...             pass
...     def makef(self,a):
...             def func(x):
...                     return a+funcFactory.np.sqrt(x)
...             return func
...
>>> ff = funcFactory()
>>> f = ff.makef(1)
>>> f(4)
3.0

Incorporating @Schotty's suggestion to use concurrent.futures you end up with code that looks like this:

import concurrent.futures
import numpy as np

class funcFactory:
    import numpy as np
    def makef(self,a):
        def func(x):
            return a+funcFactory.np.sqrt(x)
        func.__reduce__ = lambda:""
        return func

f = funcFactory().makef(0.5)
with concurrent.futures.ThreadPoolExecutor() as ex:
    y = ex.map(f, np.arange(0, 100))
print(list(y))
kpie
  • 9,588
  • 5
  • 28
  • 50
  • Copy of idea? What if the idea was brutally wrong? Python Threads, since ever were GIL-lock ( a MUTEX blocking trick ), so principally re-[SERIAL]-ising the flow of processing ... so never making even a trivial "just"-[CONCURRENT] flow of processing ever happen here ( an only fair use cases were in network & similar End-2-End transactions' latency-masking, just using the O/S resources to do it ), the less a True-[PARALLEL] ... ouch! ) If Python has ( without telling Guido von ROSSUM ) escaped from GIL-lock, that should have made a giant wave of celebrations in media - did I miss that? – user3666197 Mar 19 '22 at 21:57
  • you missed nothing. But this is still the answer to the question. – kpie Mar 20 '22 at 02:53
0

You may use concurrent.futures:

import concurrent.futures

f = generate_function(0.5)
x = np.arange(0, 100)
with concurrent.futures.ThreadPoolExecutor() as ex:
    y = ex.map(f, x)
Schottky
  • 1,549
  • 1
  • 4
  • 19
  • 1
    That's a good look. – kpie Mar 19 '22 at 21:11
  • Doesn't `ThreadPoolExecutor` use threads? I do actually need multiprocess because of the GIL, and replacing it with `ProcessPoolExecutor` still creates the same issues. Besides, for some reason threads don't play well with my actual program because while using threads work in the test example, it crashes my actual program with 0xC0000005 – goweon Mar 19 '22 at 21:24
  • Did I miss some Python news here? Python Threads, since ever were GIL-lock ( MUTEX blocking, so principally re-[SERIAL]-ising the flow of processing ... so never making even a trivial "just"-[CONCURRENT] flow of processing ever happen here, the less a True-[PARALLEL] ... ouch! ) If Python has ( without telling Guido von ROSSUM ) escaped from GIL-lock, that should have made a giant wave of celebrations in media - did I miss some kind of that? – user3666197 Mar 19 '22 at 21:51