2

I have some code in which I attempt to create 4 processes within a Pool.

Once I get any exception (eg the database it is trying to connect to is down), I want to kill the pool, sleep for 10secs and then create a new pool with 4 processes.

However it seems that the Pool is never killed because the processes names keep getting incremented each time. Does the pool have a cache where it keeps name count?

def connect_db() 
  pass


while True: 
 p = Pool(4)
 for process in multiprocessing.active_children():
  print(process.name) #why is the name incremented by 1 each time while loop iterates? 
 try:
  r = p.map(connect_db, ())
 except Exception as e:
  pool.close()
  pool.join()
  time.sleep(10)

First four processes are SpawnPoolWorker-1 to 4 and next 4 are SpawnPoolWorker-5 to 8. How does it know that I have already created 4 processes before? I am creating a new instance of Pool each time or am I doing something wrong?

Roy2012
  • 11,755
  • 2
  • 22
  • 35
InfoLearner
  • 14,952
  • 20
  • 76
  • 124
  • You should really use a context manager to handle the pool. Also, using `except Exception` like that is bad practice, see https://stackoverflow.com/questions/54948548/what-is-wrong-with-using-a-bare-except?noredirect=1&lq=1. – AMC Jun 30 '20 at 02:03

1 Answers1

0

The main reason you were not seeing what you expect is the following line of code:

r = p.map(connect_db, ())

You're calling multiprocess.map with an empty iterable, so connect_db isn't being called at all, and you don't reach the except part of the code, not closing the pool, etc.

Here's a skeleton that works, with a bunch of print statements for debugging. I'm attaching the output below, and as you can see, there are exactly four child process at every round.

import multiprocessing
import time 
import random

def connect_db(i):
    print(f"Trying to connect {i}")
    time.sleep(random.random() * 2)
    raise Exception("Failed to connect")

while True: 
    p = multiprocessing.Pool(4)
    print("active children are:")
    for idx, process in enumerate(multiprocessing.active_children()):
        print(f"Child number {idx} is {process.name}") #why is the name incremented by 1 each time while loop iterates? 
    try:
        print("About to create a pool")
        r = p.map(connect_db, range(4))
        print("Created a pool")
    except Exception as e:
        print(e)
        print("terminating threads")

        p.terminate()
    p.close()
    p.join()
    time.sleep(5)

Output:

active children are:
Child number 0 is ForkPoolWorker-2
Child number 1 is ForkPoolWorker-1
Child number 2 is ForkPoolWorker-4
Child number 3 is ForkPoolWorker-3
About to create a pool
Trying to connect 0
Trying to connect 1
Trying to connect 2
Trying to connect 3
Failed to connect
terminating threads
active children are:
Child number 0 is ForkPoolWorker-5
Child number 1 is ForkPoolWorker-6
Child number 2 is ForkPoolWorker-8
Child number 3 is ForkPoolWorker-7
About to create a pool
Trying to connect 0
Trying to connect 1
...

One last note - if the use-case is indeed database connections, there are readymade connection pools, and you should probably use one of them. Plus, I'm not sure one can share database connections across processes.

Controling process names in a pool

If, for some reason, you'd like to control the process names in a pool, you can do that by creating your own pool context:

import multiprocessing
from multiprocessing import context
import time 
import random

process_counter = 0

class MyForkProcess(multiprocessing.context.ForkProcess):
    def __init__(self, *args, **kwargs):
        global process_counter
        name = f"MyForkProcess-{process_counter}"
        process_counter += 1
        super(MyForkProcess, self).__init__(*args, name = name, **kwargs)

class MyContext(multiprocessing.context.ForkContext):
    _name = 'MyForkContext'
    Process = MyForkProcess 

def connect_db(i):
    print(f"Trying to connect {i}")
    cp = multiprocessing.current_process()
    print(f"The name of the child process is {cp.name}")
    time.sleep(random.random() * 2)
    raise Exception("Failed to connect")

context = MyContext()
while True: 
    p = context.Pool(4)
    print("active children are:")
    for idx, process in enumerate(multiprocessing.active_children()):
        print(f"Child number {idx} is {process.name}") #why is the name incremented by 1 each time while loop iterates? 
    try:
        print("About to create a pool")
        r = p.map(connect_db, range(4))
        print("Created a pool")
    except Exception as e:
        print(e)
        print("terminating threads")

        p.terminate()
        process_counter = 0

    p.close()
    p.join()
    time.sleep(5)

The output now is:

active children are:
Child number 0 is MyForkPoolWorker-2
Child number 1 is MyForkPoolWorker-0
Child number 2 is MyForkPoolWorker-3
Child number 3 is MyForkPoolWorker-1
About to create a pool
Trying to connect 0
The name of the child process is MyForkPoolWorker-0
Trying to connect 1
The name of the child process is MyForkPoolWorker-1
Trying to connect 2
The name of the child process is MyForkPoolWorker-2
Trying to connect 3
The name of the child process is MyForkPoolWorker-3
Failed to connect
terminating threads
active children are:
Child number 0 is MyForkPoolWorker-2
Child number 1 is MyForkPoolWorker-0
Child number 2 is MyForkPoolWorker-1
Child number 3 is MyForkPoolWorker-3
About to create a pool
...
Roy2012
  • 11,755
  • 2
  • 22
  • 35
  • My question is different. My snippet is a simplication of the problem. Why are your pool named ForkPoolWorker-5 the second time? How can you make them ForkPoolWorker-1 to 4 each time? – InfoLearner Jun 30 '20 at 08:34
  • I believe the names are automatically generated, Why are the names significant? – Roy2012 Jun 30 '20 at 08:37
  • And in any case, you can just change the name of the process using process.name = "my_new_name". – Roy2012 Jun 30 '20 at 08:48
  • I use the names to perform operations because each process was getting numbers 1 to 4 Before and within the connect to db, i could take out the numerical part and send messages to the appropriate database. Now it gets 5-8, which breaks. How does it get these names? I can set the names but how do I ensure each time the pool gets a new set of processes with names 1-4 – InfoLearner Jun 30 '20 at 13:14
  • In my code example, I map range(4) to the four processes. This means that each process would get its own identifier (the argument 'i') in connect_db. You can use that number as the process identifier. – Roy2012 Jun 30 '20 at 13:54
  • Still... The question for me is how it gets the names... And how I can ensure it starts fresh each time? I have those solutions in mind but my fundamental question is about where that information is stored in memory – InfoLearner Jun 30 '20 at 17:20
  • The sub-processes and the parent process don't share memory. When you 'terminate' the child processes, they all die. The process name, as far as I could see, is only visible to the parent process - not to the child ones. So - if you want to assign IDs to the processes, you should do that by passing an argument (like 'i' in my code). You can then maintain a dict or change the names of the processes at the parent level to know which child process matches which ID. – Roy2012 Jun 30 '20 at 17:23
  • I don't think you understand the question. I know thr workaround and I am not after how to set process names. My question is where is it setting the names? Anyways it requires someone to read up the multiprocessing pool.py file and not make wild guesses. Instantiating a new instance of pool should recreate pool but it seems that the pool is an OS level object and – InfoLearner Jun 30 '20 at 18:32
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/216973/discussion-between-roy2012-and-infolearner). – Roy2012 Jun 30 '20 at 18:44