2

I have the following code that is leveraging multiprocessing to iterate through a large list and find a match. How can I get all processes to stop once a match is found in any one processes? I have seen examples but I none of them seem to fit into what I am doing here.

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts

def do_job(first_bits):
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        # CHECK FOR MATCH HERE
        print(''.join(x))
        # EXIT ALL PROCESSES IF MATCH FOUND

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    results = []

    for i in range(num_parts):
        if i == num_parts - 1:
            first_bit = alphabet[part_size * i :]
        else:
            first_bit = alphabet[part_size * i : part_size * (i+1)]
        pool.apply_async(do_job, (first_bit,))

    pool.close()
    pool.join()

Thanks for your time.

UPDATE 1:

I have implemented the changes suggested in the great approach by @ShadowRanger and it is nearly working the way I want it to. So I have added some logging to give an indication of progress and put a 'test' key in there to match. I want to be able to increase/decrease the iNumberOfProcessors independently of the num_parts. At this stage when I have them both at 4 everything works as expected, 4 processes spin up (one extra for the console). When I change the iNumberOfProcessors = 6, 6 processes spin up but only for of them have any CPU usage. So it appears 2 are idle. Where as my previous solution above, I was able to set the number of cores higher without increasing the num_parts, and all of the processes would get used.

enter image description here

I am not sure about how to refactor this new approach to give me the same functionality. Can you have a look and give me some direction with the refactoring needed to be able to set iNumberOfProcessors and num_parts independently from each other and still have all processes used?

Here is the updated code:

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 6

def do_job(first_bits):
    iAttemptNumber = 0
    iLastProgressUpdate = 0
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        sKey = ''.join(x)
        iAttemptNumber = iAttemptNumber + 1
        if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
            iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
            print("Attempt#:", iAttemptNumber, "Key:", sKey)
        if sKey == 'test':
            print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
            return True

def get_part(i):
    if i == num_parts - 1:
        first_bit = alphabet[part_size * i :]
    else:
        first_bit = alphabet[part_size * i : part_size * (i+1)]
    return first_bit

if __name__ == '__main__':
    # with statement with Py3 multiprocessing.Pool terminates when block exits
    with multiprocessing.Pool(processes = iNumberOfProcessors) as pool:

        # Don't need special case for final block; slices can 
        for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))):
             if gotmatch:
                 break
        else:
             print("No matches found")

UPDATE 2:

Ok here is my attempt at trying @noxdafox suggestion. I have put together the following based on the link he provided with his suggestion. Unfortunately when I run it I get the error:

... line 322, in apply_async raise ValueError("Pool not running") ValueError: Pool not running

Can anyone give me some direction on how to get this working.

Basically the issue is that my first attempt did multiprocessing but did not support canceling all processes once a match was found.

My second attempt (based on @ShadowRanger suggestion) solved that problem, but broke the functionality of being able to scale the number of processes and num_parts size independently, which is something my first attempt could do.

My third attempt (based on @noxdafox suggestion), throws the error outlined above.

If anyone can give me some direction on how to maintain the functionality of my first attempt (being able to scale the number of processes and num_parts size independently), and add the functionality of canceling all processes once a match was found it would be much appreciated.

Thank you for your time.

Here is the code from my third attempt based on @noxdafox suggestion:

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 4


def find_match(first_bits):
    iAttemptNumber = 0
    iLastProgressUpdate = 0
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        sKey = ''.join(x)
        iAttemptNumber = iAttemptNumber + 1
        if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
            iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
            print("Attempt#:", iAttemptNumber, "Key:", sKey)
        if sKey == 'test':
            print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
            return True

def get_part(i):
    if i == num_parts - 1:
        first_bit = alphabet[part_size * i :]
    else:
        first_bit = alphabet[part_size * i : part_size * (i+1)]
    return first_bit

def grouper(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fillvalue)

class Worker():

    def __init__(self, workers):
        self.workers = workers

    def callback(self, result):
        if result:
            self.pool.terminate()

    def do_job(self):
        print(self.workers)
        pool = multiprocessing.Pool(processes=self.workers)
        for part in grouper(alphabet, part_size):
            pool.apply_async(do_job, (part,), callback=self.callback)
        pool.close()
        pool.join()
        print("All Jobs Queued")

if __name__ == '__main__':
    w = Worker(4)
    w.do_job()
user2109254
  • 1,709
  • 2
  • 30
  • 49

2 Answers2

2

You can check this question to see an implementation example solving your problem.

This works also with concurrent.futures pool.

Just replace the map method with apply_async and iterated over your list from the caller.

Something like this.

for part in grouper(alphabet, part_size):
    pool.apply_async(do_job, part, callback=self.callback)

grouper recipe

Community
  • 1
  • 1
noxdafox
  • 14,439
  • 4
  • 33
  • 45
  • thanks for the response. First issue I have is grouper. I see there is an itertools._grouper but when I try and use that I get the error: TypeError: must be itertools.groupby, not str. What library does grouper belong to? – user2109254 Jun 09 '16 at 08:59
  • The grouper is not a function belonging to Python's standard library. You can find it's recipe in the link I provided. – noxdafox Jun 09 '16 at 10:12
  • Right... don't know how I mist that... sorry... I have tried moving the initialization of the pool variable to the do_job function but I still get the error saying: Pool not running, any ideas on that? – user2109254 Jun 09 '16 at 10:22
  • Ok got grouper working, and updated the code in my last attempt with it. However still getting the 'Pool not running' error. Any ideas on what the issue is there? – user2109254 Jun 09 '16 at 11:15
  • Please paste the whole traceback of the exception. Also make sure that if find_match does not find the match returns false to ensure the callback does not stop the pool in the wrong moment. – noxdafox Jun 09 '16 at 11:36
  • ... and the callback is trying to to interact with an object which is non existent (self.pool). Move the initialisation of the Pool in the Worker's class one. – noxdafox Jun 09 '16 at 11:39
  • Full traceback is: Traceback (most recent call last): File "C:\Source\TestBruteForce\TestBruteForce\TestDynamicWithCancel3.py", line 55, in w.do_job() File "[filename and path removed].py", line 48, in do_job pool.apply_async(do_job, (part,), callback=self.callback) NameError: name 'do_job' is not defined – user2109254 Jun 09 '16 at 12:18
  • that got it... had to add self to references to pool. Thanks for your help!! – user2109254 Jun 09 '16 at 12:21
  • so the error is different from what you are reporting. You are passing to the apply_async the wrong function. Check the `apply_async` documentation. – noxdafox Jun 09 '16 at 12:23
0

multiprocessing isn't really designed to cancel tasks, but you can simulate it for your particular case by using pool.imap_unordered and terminating the pool when you get a hit:

def do_job(first_bits):
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        # CHECK FOR MATCH HERE
        print(''.join(x))
        if match:
            return True
    # If we exit loop without a match, function implicitly returns falsy None for us
# Factor out part getting to simplify imap_unordered use
def get_part(i):
    if i == num_parts - 1:
        first_bit = alphabet[part_size * i :]
    else:
        first_bit = alphabet[part_size * i : part_size * (i+1)]

if __name__ == '__main__':
    # with statement with Py3 multiprocessing.Pool terminates when block exits
    with multiprocessing.Pool(processes=4) as pool:

        # Don't need special case for final block; slices can 
        for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))):
             if gotmatch:
                 break
        else:
             print("No matches found")

This will run do_job for each part, returning results as fast as it can get them. When a worker returns True, the loop breaks, and the with statement for the Pool is exited, terminate-ing the Pool (dropping all work in progress).

Note that while this works, it's kind of abusing multiprocessing; it won't handle canceling individual tasks without terminating the whole Pool. If you need more fine grained task cancellation, you'll want to look at concurrent.futures, but even there, it can only cancel undispatched tasks; once they're running, they can't be cancelled without terminating the Executor or using a side-band means of termination (having the task poll some interprocess object intermittently to determine if it should continue running).

ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
  • thanks heaps for this great approach. I gave it a go and it nearly does what I need it to. I have updated the question with details of my implementation of your approach. Can you have a look and give me some direction with the refactoring needed to be able to set iNumberOfProcessors and num_parts independently from each other and still have all processes used? – user2109254 Jun 08 '16 at 08:25