4

thanks for taking a look at this. I confess I have been dabbling with parallel processing in python for all of 1 week now so I apologize if there is an obvious solution I missed. I have a piece of code that I would like to run several different instances of of a mp.pool(). Those that were on the main .py file called worked fine but when I tried to add them to functions in modules I get no output from them all. The app just runs past it and continues. I am thinking it may have something to do with this post but it didn't give any ideas on alternative methods to accomplish what I need. The code that works in a simple example is this:

import multiprocessing as mp
def multiproc_log_result(retval):
    results.append(retval)
    if len(results) % (10 // 10) == 0:
        print('{0}% done'.format(100 * len(results) / 10))

def meat():
    print 'beef'
    status = True
    return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()


def veggie():
    print 'carrot'
    status = True
    return status

results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(veggie, callback=multiproc_log_result)
pool.close()
pool.join()

And the code that doesn't work is:

import multiprocessing as mp
def multiproc_log_result(retval):
    results.append(retval)
    if len(results) % (10 // 10) == 0:
        print('{0}% done'.format(100 * len(results) / 10))

def meat():
    print 'beef'
    status = True
    return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)
        if len(results) % (10 // 10) == 0:
            print('{0}% done'.format(100 * len(results) / 10))

    def veggie():
        print 'carrot'
        status = True
        return status

    results = []
    pool = mp.Pool(thread_count)
    for x in range(10):
        pool.apply_async(veggie, callback=multiproc_log_result)
    pool.close()
    pool.join()
nested_stupid_fn()

Ultimately I would like that example that doesn't work to be one more step removed by having it live in another function in a separate module. So that when I import the module packngo and use it as packngo.basic_packngo(inputs) and has the contents of the nest function somewhere within it they would run. Any help would be greatly appreciated. :D I am a very simple man so if you could explain as you would to a child maybe then it will sink in my head!

ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
matrimcauthon
  • 63
  • 1
  • 5
  • I would point out that some of the code is gutted like multiproc_log_result which really isn't doing much of anything at the moment. It was just a quick/dirty way to simplify and test to isolate where my problem was occurring. – matrimcauthon Mar 01 '18 at 00:02

2 Answers2

3

The other question you linked has the solution, it's just not spelled out: You cannot use nested functions as the func argument for the apply*/*map* family of methods on multiprocessing.Pool. They work for multiprocessing.dummy.Pool, because multiprocessing.dummy is backed by threads which can directly pass around function references, but multiprocessing.Pool must pickle the functions, and only functions with importable names can be pickled. If you check the name of a nested function, it's something like modulename.outerfuncname.<locals>.innerfuncname, and that <locals> component makes it impossible to import (which is usually a good thing; nested functions that make use of being nested usually have critical state in closure scope, which mere importing would lose).

It's perfectly fine for the callback functions to be defined in a nested fashion, as they're executed in the parent process, they aren't sent to the workers. In your case, only the callback is relying on closure scope, so it's perfectly fine to move the func (veggie) out to global scope, defining your packngo module as:

def veggie():
    print 'carrot'
    status = True
    return status

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)
        if len(results) % (10 // 10) == 0:
            print('{0}% done'.format(100 * len(results) / 10))

    results = []
    pool = mp.Pool(thread_count)
    for x in range(10):
        pool.apply_async(veggie, callback=multiproc_log_result)
    pool.close()
    pool.join()
nested_stupid_fn()

Yes, it means veggie becomes a public member of the module in question. You can prefix it with an underscore (_veggie) if you want to indicate it should be considered an implementation detail, but it must necessarily be global to use it with multiprocessing.Pool.

ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
0

Well I think the issue is that inside the scope of multiproc_log_result the variable results doesn't exist. So what you should do is append to results directly the result of your async call. You won't be able to track the progress though (no way to directly share a global variable for a callback function outside a class I guess)

from multiprocessing.pool import ThreadPool

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)

    def veggie():
        print 'carrot'
        status = True
        return status

    results = []
    pool = ThreadPool(thread_count)
    for x in range(10):
        results.append(pool.apply_async(veggie))

    pool.close()
    pool.join()

    results = [result.get() for result in results]  # get value from async result

    ...then do stuff with results
Gabriel Samain
  • 497
  • 3
  • 10
  • Thanks for the idea but i don't think that is causing the issue. I can completely remove the callback for status tracking and it still never actually does anything (in this case print 'carrot' ten times but in the real example use shutil to move a file). – matrimcauthon Mar 01 '18 at 18:37
  • Well did you try ? I also change Pool by ThreadPool, it does work on my machine – Gabriel Samain Mar 02 '18 at 09:08
  • Anyway, it won't print you carrot since your separated thread won't be able to print in the console – Gabriel Samain Mar 02 '18 at 09:09
  • 1
    @GabrielSamain: The `ThreadPool` makes it work only because sending a function to a thread worker doesn't require pickling, while sending it to a process worker does. Nested functions don't have importable names, and only functions with globally importable names can be pickled. – ShadowRanger Jun 28 '19 at 16:23