1

I'm pretty new at multiprocessing so I'm not sure if the following program is correct. The program iterates through an infinite list of integers and does calculations with each integer. The problem I'm facing is that the program couldn't be terminated with ctrl + c.

def process_number(number: int):
    # processes the number


if __name__ == '__main__':
    control = 1
    list_size = 100000

    while True:
        value_n_list = [n for n in range(control, control + list_size)]

        p = Pool()
        result = p.map(process_number, value_n_list)
        control += list_size 
        p.close()

Each list fed into the multiprocessing pool is of size 100000 and this increases with every iteration.

I'd like to know if there is any method of terminating the program properly while the program is still running. Please let me know if this program can be improved in any way.

Haru Frost
  • 237
  • 1
  • 12

2 Answers2

4

There are several answers on SO that address your question, but they do not seem to work with the map function where the main process is blocked waiting for all the submitted tasks to complete. This may not be an ideal solution, but it does work:

  1. Issue a call to signal.signal(signal.SIGINT, signal.SIG_IGN) in each process in your process pool to ignore the interrupt entirely and leave the handling to the main process and
  2. Use instead method Pool.imap (or Pool.imap_unordered) instead of Pool.map which lazily evaluates your iterable argument for submitting tasks and processing results. In this way it (a) does not block waiting for all the results and (b) you save memory in not having to create an actual list for value_n_list and instead use a generator expression.
  3. Have the main process issue print statements periodically and frequently, for example reporting on the progress of the submitted tasks being completed. This is required for the keyboard interrupt to be perceived. In the code below a tqdm progress bar is being used but you can simply print a completion count every N task completions where N is selected so that you do not have to wait too long for the interrupt to take effect after Ctrl-c has been entered:
from multiprocessing import Pool
import signal
import tqdm

def init_pool():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def process_number(number: int):
    import time
    # processes the number
    time.sleep(.001)

if __name__ == '__main__':
    control = 1
    list_size = 100000

    # No reason to create the pool over and over again:
    with Pool(initializer=init_pool) as p:
        try:
            with tqdm.trange(list_size) as progress_bar:
                while True:
                    #value_n_list = (n for n in range(control, control + list_size))
                    value_n_list = range(control, control + list_size)
                    progress_bar.reset()
                    result = []
                    # The iterable returned by `imap` must be iterated.
                    # If you don't care about the value, don't store it away and use `imap_unordered` instead:
                    for return_value in p.imap(process_number, value_n_list):
                        progress_bar.update(1)
                        result.append(return_value)
                    control += list_size
        except KeyboardInterrupt:
            print('Ctrl-c entered.')

Update

You did not specify which platform you were running under (you should always tag your question with the platform when you tag a question with multiprocessing), but I assumed it was Windows. If , however, you are running under Linux, the following simpler solution should work:

from multiprocessing import Pool
import signal

def init_pool():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def process_number(number: int):
    import time
    # processes the number
    time.sleep(.001)

if __name__ == '__main__':
    control = 1
    list_size = 100000

    # No reason to create the pool over and over again:
    with Pool(initializer=init_pool) as p:
        try:
            while True:
                value_n_list = [n for n in range(control, control + list_size)]
                result = p.map(process_number, value_n_list)
                control += list_size
        except KeyboardInterrupt:
            print('Ctrl-c entered.')

See Keyboard Interrupts with python's multiprocessing Pool

Update

If that is all your "worker" function, process_number is doing (squaring a number), your performance will suffer from using multiprocessing. The overhead from (1) Creating and destroying the process pools (and thus the processes) and (2) writing and reading to arguments and return values from address space to another (using queues). The following benchmarks this:

  1. Function non-multiprocessing performs 10 iterations (rather than an infinite loop for obvious reasons) of looping 10,000 times calling process_number and saving all the return values in result.

  2. Function multiprocessing_1 uses multiprocessing to perform the above but only creates the pool once (8 logical cores, 4 physical cores).

  3. Function multiprocessing_2 re-creates the pool for each of the 10 iterations.

  4. Function multiprocessing_3 is included as a "sanity check" and is identical to multiprocessing_1 except it has the Linux Ctrl-c checking code.

The timings of each is printed out.

from multiprocessing import Pool
import time
import signal

def init_pool():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def process_number(number: int):
    # processes the number
    return number * number

N_TRIALS = 10
list_size = 100_000

def non_multiprocessing():
    t = time.time()
    control = 1
    for _ in range(N_TRIALS):
        result = [process_number(n) for n in range(control, control + list_size)]
        print(control, result[0], result[-1])
        control += list_size
    return time.time() - t

def multiprocessing_1():
    t = time.time()
    # No reason to create the pool over and over again:
    with Pool() as p:
        control = 1
        for _ in range(N_TRIALS):
            value_n_list = [n for n in range(control, control + list_size)]
            result = p.map(process_number, value_n_list)
            print(control, result[0], result[-1])
            control += list_size
    return time.time() - t

def multiprocessing_2():
    t = time.time()
    control = 1
    for _ in range(N_TRIALS):
        # Create the pool over and over again:
        with Pool() as p:
            value_n_list = [n for n in range(control, control + list_size)]
            result = p.map(process_number, value_n_list)
            print(control, result[0], result[-1])
            control += list_size
    return time.time() - t


def init_pool():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def multiprocessing_3():
    t = time.time()
    # No reason to create the pool over and over again:
    with Pool(initializer=init_pool) as p:
        try:
            control = 1
            for _ in range(N_TRIALS):
                value_n_list = [n for n in range(control, control + list_size)]
                result = p.map(process_number, value_n_list)
                print(control, result[0], result[-1])
                control += list_size
        except KeyboardInterrupt:
            print('Ctrl-c entered.')
    return time.time() - t


if __name__ == '__main__':
    print('non_multiprocessing:', non_multiprocessing(), end='\n\n')
    print('multiprocessing_1:', multiprocessing_1(), end='\n\n')
    print('multiprocessing_2:', multiprocessing_2(), end='\n\n')
    print('multiprocessing_3:', multiprocessing_3(), end='\n\n')

Prints:

1 1 10000000000
100001 10000200001 40000000000
200001 40000400001 90000000000
300001 90000600001 160000000000
400001 160000800001 250000000000
500001 250001000001 360000000000
600001 360001200001 490000000000
700001 490001400001 640000000000
800001 640001600001 810000000000
900001 810001800001 1000000000000
non_multiprocessing: 0.11899852752685547

1 1 10000000000
100001 10000200001 40000000000
200001 40000400001 90000000000
300001 90000600001 160000000000
400001 160000800001 250000000000
500001 250001000001 360000000000
600001 360001200001 490000000000
700001 490001400001 640000000000
800001 640001600001 810000000000
900001 810001800001 1000000000000
multiprocessing_1: 0.48778581619262695

1 1 10000000000
100001 10000200001 40000000000
200001 40000400001 90000000000
300001 90000600001 160000000000
400001 160000800001 250000000000
500001 250001000001 360000000000
600001 360001200001 490000000000
700001 490001400001 640000000000
800001 640001600001 810000000000
900001 810001800001 1000000000000
multiprocessing_2: 2.4370007514953613

1 1 10000000000
100001 10000200001 40000000000
200001 40000400001 90000000000
300001 90000600001 160000000000
400001 160000800001 250000000000
500001 250001000001 360000000000
600001 360001200001 490000000000
700001 490001400001 640000000000
800001 640001600001 810000000000
900001 810001800001 1000000000000
multiprocessing_3: 0.4850032329559326

Even with creating the pool once, multiprocessing took approximately 4 times longer than a straight non-multiprocessing implementation. But it runs approximately 5 times faster than the version that re-creates the pool for each of the 10 iterations. As expected, the running time of multiprocessing_3 is essentially identical to the running time for multiprocessing_1, i.e. the Ctrl-c code has no effect on the running behavior.

Conclusions

  1. The Linux Ctrl-c code should have no significant effect on the running behavior of the program.
  2. Moving the pool-creation code outside the loop should greatly reduce the running time of the program. As to what effect, however, it should have on CPU-utilization, I cannot venture a guess.
  3. Your problem is not a suitable candidate as is for multiprocessing.
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Thanks for the help. I have a quick follow-up question. I'm currently running the second program on a Linux server and it seems like that each of the CPUs is only getting around 1% load. Is there a way of maximizing each CPU's load to 100% to speed up the iteration process or is the program designed this way so that terminating the program is plausible? – Haru Frost Aug 07 '21 at 23:23
  • For this you need to open a new question and publish what the actual `process_number` function is doing. But I don't believe that there is anything in the above code, especially the Linux version, that should significantly reduce CPU utilization because of code added for Ctrl-c handling. Are you telling me that you were getting 100% CPU utilization with the code you posted? – Booboo Aug 08 '21 at 04:52
  • I see. Yes, the code that I've posted above does get 100% CPU utilization for every core somehow. The `process_number` function can simply be `return number*number` and the CPU utilization would still be what I described above. One thing that I've noticed is that my program doesn't wait until the previous 100000 iterations are finished before creating a new pool, and since it's in a while loop the pool creation process doesn't stop, thus utilizing 100% of the CPU. (again, I'm pretty new to multiprocessing so I could be completely off) – Haru Frost Aug 08 '21 at 18:24
  • See **Update**. No, your `p.map` call blocks until *all* the results have been computed and returned, unlike my Windows solution for the Ctrl-c issue that uses `p.imap`, which does not block and you have to iterate the *iterator* that is returned by `imap` to get the results one by one as they complete. That is how the progress bar works. Strictly speaking, *in general*, if you want the progress bar to serve as a true measure of progress, you should use the `imap_unordered` method: See [this](https://stackoverflow.com/questions/51601756/use-tqdm-with-concurrent-futures/63834834#63834834). – Booboo Aug 08 '21 at 19:28
  • My worker function is much more complex than squaring an integer. That was simply an example that I've given. Anyhow, I do really appreciate the clarifications. My apologies for all the follow-up questions, but it is if I create the pool with `Pool(processes=4, ...)`, then it should in theory utilize all 4 physical cores correct? – Haru Frost Aug 09 '21 at 02:28
  • Yes, I would think so. – Booboo Aug 09 '21 at 10:35
0

As a general principle, you must close any resource that you have opened.

For multiprocessing, pooling solves this problem by acting as a container to keep references of created worker processes and resources, making use of the close and terminate.

Read the docs

In your code you are not handling the closing properly, because when an error is raised the close function is never reachable which leaves what is known as a memory leak

To fix this either use with to initialize your pool, or put your code in a try-except-finally block where in the finally block you always call pool.close

Also take your pool creation outside the while loop you are creating an instance for each iteration.

SaleemKhair
  • 499
  • 3
  • 12
  • It's nice to disseminate "general" information to the potentially ignorant. Better would be providing information that actually addresses the problem that the OP is having. – Booboo Aug 07 '21 at 19:33
  • Read last 2 paragraphs starting with "To fix.." they clearly address the fix... writing the code encourage copy/pasta – SaleemKhair Aug 07 '21 at 22:01