6

I'm using ProcessPoolExecutor context manager to run several Kafka consumers in parallel. I need to store the process IDs of the child processes so that later, I can cleanly terminate those processes. I have such code:

Class MultiProcessConsumer:
    ...

    def run_in_parallel(self):
        parallelism_factor = 5

        with ProcessPoolExecutor() as executor:
            processes = [executor.submit(self.consume) for _ in range(parallelism_factor)]
            # It would be nice If I could write [process.pid for process in processes] to a file here.

    def consume(self):
        while True:
            for message in self.kafka_consumer:
                do_stuff(message)

I know I can use os.get_pid() in the consume method to get PIDs. But, handling them properly (in case of constant shutting down or starting up of consumers) requires some extra work.

How would you propose that I get and store PIDs of the child processes in such a context?

Amir Afianian
  • 2,679
  • 4
  • 22
  • 46

2 Answers2

5

os.get_pid() seems to be the way to go. Just pass them through a Queue or Pipe in combination with maybe some random UUID that you pass to the process before to identify the PID.

from concurrent.futures import ProcessPoolExecutor
import os
import time 
import uuid
#from multiprocessing import Process, Queue
import multiprocessing
import queue
#The Empty exception in in Queue, multiprocessing borrows 
#it from there

# https://stackoverflow.com/questions/9908781/sharing-a-result-queue-among-several-processes     
m = multiprocessing.Manager()
q = m.Queue()

def task(n, queue, uuid):
    my_pid = os.getpid()
    print("Executing our Task on Process {}".format(my_pid))
    queue.put((uuid, my_pid))
    time.sleep(n)
    return n * n

def main():

    with ProcessPoolExecutor(max_workers = 3) as executor:

        some_dict = {}
        for i in range(10):
            print(i)

            u = uuid.uuid4()
            f = executor.submit(task, i, q, u)
            some_dict[u] = [f, None] # PID not known here

            try:
                rcv_uuid, rcv_pid = q.get(block=True, timeout=1)
                some_dict[rcv_uuid][1] = rcv_pid # store PID
            except queue.Empty as e:
                print('handle me', e)
            print('I am', rcv_uuid, 'and my PID is', rcv_pid)


if __name__ == '__main__':
    main()
Joe
  • 6,758
  • 2
  • 26
  • 47
0

Although this field is private, you could use the field in PoolProcessExecutor self._processes. The code snippet below shows how to use this variable.

import os

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait

nb_processes = 100

executor = ProcessPoolExecutor(nb_processes )

futures = [executor.submit(os.getpid) for _ in range(nb_processes )]
wait(futures)
backends = list(map(lambda x: x.result(), futures))

assert len(set(backends)) == nb_processes 

In the case above, an assertion error is raised. This is because a new task can reuse the forked processes in the pool. You cannot know all forked process IDs through the method you memtioned. Hence, you can do as:

import os

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait

nb_processes = 100

executor = ProcessPoolExecutor(nb_processes )

futures = [executor.submit(os.getpid) for _ in range(nb_processes )]
wait(futures)
backends = list(map(lambda x: x.result(), futures))

assert len(set(executor._processes.keys())) == nb_processes 
print('all of PID are: %s.' % list(executor._processes.keys()))

If you don't want to destroy the encapsulation, you could inhert the ProcessPoolExecutor and create a new property for that.

Wotchin
  • 160
  • 6