10
mp.set_start_method('spawn')
total_count = Counter(0)
pool = mp.Pool(initializer=init, initargs=(total_count,), processes=num_proc)    

pool.map(part_crack_helper, product(seed_str, repeat=4))
pool.close()
pool.join()

So I have a pool of worker process that does some work. It just needs to find one solution. Therefore, when one of the worker processes finds the solution, I want to stop everything.

One way I thought of was just calling sys.exit(). However, that doesn't seem like it's working properly since other processes are running.

One other way was to check for the return value of each process calls (the return value of part_crack_helper function) and call terminate on that process. However, I don't know how to do that when using that map function.

How should I achieve this?

whiteSkar
  • 1,614
  • 2
  • 17
  • 30

2 Answers2

5

You can use callbacks from Pool.apply_async.

Something like this should do the job for you.

from multiprocessing import Pool


def part_crack_helper(args):
    solution = do_job(args)
    if solution:
        return True
    else:
        return False


class Worker():
    def __init__(self, workers, initializer, initargs):
        self.pool = Pool(processes=workers, 
                         initializer=initializer, 
                         initargs=initargs)

    def callback(self, result):
        if result:
            print("Solution found! Yay!")
            self.pool.terminate()

    def do_job(self):
        for args in product(seed_str, repeat=4):
            self.pool.apply_async(part_crack_helper, 
                                  args=args, 
                                  callback=self.callback)

        self.pool.close()
        self.pool.join()
        print("good bye")


w = Worker(num_proc, init, [total_count])
w.do_job()
Yuri Astrakhan
  • 8,808
  • 6
  • 63
  • 97
noxdafox
  • 14,439
  • 4
  • 33
  • 45
1

If you are ok with using another library, you could solve it the following way with Pebble. The advantage of this solution is that you can additionally specify a timeout. That means either the program ends if there is one successful worker or if it runs out of time:

from pebble import ProcessPool, ProcessExpired
from concurrent.futures import TimeoutError
import time

pool = ProcessPool()

def my_function(args):
    print("running " + str(args))
    time.sleep((args + 1) * 30)
    print("process won:" + str(args))
    return True


start_time = time.time()

future = pool.map(my_function, range(4), timeout=65)
iterator = future.result()

while True:
    try:
        result = next(iterator)
        if result:
            pool.stop()
            pool.join(timeout=0)
            break
    except StopIteration:
        break
    except TimeoutError as error:
        print("function took longer than %d seconds" % error.args[1])
    except ProcessExpired as error:
        print("%s. Exit code: %d" % (error, error.exitcode))
    except Exception as error:
        print("function raised %s" % error)
        print(error.traceback)  # Python's traceback of remote process

print("whole time: " + str(time.time() - start_time))
Felix
  • 161
  • 1
  • 10