2

Hi I'm referencing the following question because it's similar to what I'm trying to achieve, however, I'm getting an error that I can't seem to figure out so looking for some help

Combining multithreading and multiprocessing with concurrent.futures

Here's my test code:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import numpy as np
from os import cpu_count
from functools import partial

num_list = range(0,1000)
  
def test(x):
    x**2
             
def multithread(f, lst):
    print('Thread running')
    with ThreadPoolExecutor() as thread_executor:
        thread_executor.map(f, lst)

def multiprocesser(lst, f, n_processors=cpu_count()//2):
    chunks = np.array_split(lst, n_processors)
    with ProcessPoolExecutor(max_workers=n_processors) as mp_executor:
        mp_executor.map(partial(multithread, f), chunks)

if __name__ == '__main__':
    multiprocesser(num_list, test)
Process SpawnProcess-31:
Traceback (most recent call last):
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 315, in _bootstrap
    self.run()
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\concurrent\futures\process.py", line 237, in _process_worker
    call_item = call_queue.get(block=True)
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\queues.py", line 122, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'multithread' on <module '__main__' (built-in)>
Process SpawnProcess-32:
Traceback (most recent call last):
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 315, in _bootstrap
    self.run()
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\concurrent\futures\process.py", line 237, in _process_worker
    call_item = call_queue.get(block=True)
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\queues.py", line 122, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'multithread' on <module '__main__' (built-in)>

So I didn't specify number of threads (don't see a reason to for the threadpool executor). Having trouble understanding what the error actually means and how I can fix it. Any help would be appreciated.

chicagobeast12
  • 643
  • 1
  • 5
  • 20
  • 3
    Your function multithread() takes two arguments. You're only passing one – DarkKnight Sep 13 '22 at 16:59
  • 1
    You *must* guard the call to `multiprocesser` with an `if __name__ == '__main__':` guard (any use of multiprocessing on a non-`fork`-based system, including Windows and macOS, needs to protect any code that might launch processes that way). – ShadowRanger Sep 13 '22 at 17:51
  • ShadowRanger, hi - I did guard the call, must've accidently replaced it, if you look at Vlads below, this is essentially what I'm running - however running into the above error – chicagobeast12 Sep 13 '22 at 17:53

2 Answers2

2

The error probably stems from the fact that multithread() is being called incorrectly.

Try this:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import numpy as np
from os import cpu_count
from functools import partial

num_list = range(0,1000)

def test(x):
    x**2
               
def multithread(f, lst):
    print('Thread running')
    with ThreadPoolExecutor() as thread_executor:
        thread_executor.map(f, lst)

def multiprocesser(lst, f, n_processors=cpu_count()//2):
    chunks = np.array_split(lst, n_processors)
    with ProcessPoolExecutor(max_workers=n_processors) as mp_executor:
        mp_executor.map(partial(multithread, f), chunks)

if __name__ == '__main__':
    multiprocesser(num_list, test)
DarkKnight
  • 19,739
  • 3
  • 6
  • 22
  • Thanks for the help Vlad... but still getting the same error message – chicagobeast12 Sep 13 '22 at 17:07
  • @chicagobeast12 The code in my answer runs perfectly in my Python 3.10.7 environment on macOS 12.6. Did you copy'n'paste the code I presented? – DarkKnight Sep 13 '22 at 17:11
  • I did copy and paste the exact code. I'm running python 3.10 on Windows 10 with conda – chicagobeast12 Sep 13 '22 at 17:47
  • @chicagobeast12: Did you make sure to include the `if __name__ == '__main__':` guard to protect the invocation of `multiprocesser(num_list, test)`? If you don't (like you failed to do in the code in your question), terrible things happen on any system using the `'spawn'` or `'forkserver'` launch methods for multiprocessing (`'spawn'` is the default on all versions for Windows, and on recent Python versions for macOS). – ShadowRanger Sep 13 '22 at 17:53
  • @chicagobeast12 Please edit your question to show how you adapted this answer and also show the exact error message(s) – DarkKnight Sep 13 '22 at 17:57
  • Vlad, I made the update - I ran it in jupyter notebooks in a different environment and it ran fine. So its either an issue with my current env or running in spyder (currently doing). Would you have any idea on what to check? – chicagobeast12 Sep 13 '22 at 18:02
1

Missing if __name__ == '__main__'

if __name__ == '__main__':
    multiprocesser(num_list, test)

Unintended recursion

When you don't block out the call to multiprocessor(), you have recursion when the subprocess runs the python script.

Safe importing of main module

The following is an example of the same type of problem from the multiprocessing docs:

https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocess#the-spawn-and-forkserver-start-methods

Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).

For example, using the spawn or forkserver start method running the following module would fail with a RuntimeError:

multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo) p.start()

Instead one should protect the “entry point” of the program by using if __name__ == '__main__': as follows:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start() ```
SargeATM
  • 2,483
  • 14
  • 24