1

I have a few processes that are meant to run in a while loop. I basically have some processes collecting data, and before they stop, I want them to save the data to a csv or json file. What I have right now is using the super function to override the join method in the multiprocessing.Process class.

class Processor(multiprocessing.Process):
    def __init__(self, arguments):
        multiprocessing.Process.__init__(self)

    def run(self):
        self.main_function()

    def main_function(self):
        While True:
            #do things to incoming data

    def function_on_join(self):
        #do one last thing before the process ends

    def join(self, timeout=None):
        self.function_on_join()
        super(Processor, self).join(timeout=timeout)

Is there a better way/correct way/more pythonic way to do this?

bschmach15
  • 13
  • 2

1 Answers1

1

I suggest you'll take a look at concurrent.futures module.

if you can describe your job as list of tasks to be done by a pool of workers.

Task Based multiprocessing

when you have a sequence of jobs (e.g. list of filenames) and you want them to be processed in parallel - you can do so as follows:

from concurrent.futures import ProcessPoolExecutor    
import requests

def get_url(url):
    resp = requests.get(url)
    print(f'{url} - {resp.status_code}')
    return url

jobs = ['http://google.com', 'http://python.org', 'http://facebook.com']

# create process pool of 3 workers
with ProcessPoolExecutor(max_workers=1) as pool:
    # run in parallel each job and gather the returned values
    return_values = list(pool.map(get_url, jobs))

print(return_values)

outputs:

http://google.com - 200
http://python.org - 200
http://facebook.com - 200
['http://google.com', 'http://python.org', 'http://facebook.com']

Not Task-Based Multiprocessing

When you just want to run multiple subprocesses that not consuming jobs like the first case, you may want to use multiprocessing.Process.

you can use it similarly to threading.Thread in a procedural fashion as well as OOP fashion.

Example for procedural fashion (IMHO more pythonic):

import os
from multiprocessing import Process

def func():
    print(f'hello from: {os.getpid()}')

processes = [Process(target=func) for _ in range(4)]  # creates 4 processes

for process in processes:
    process.daemon = True  # close the subprocess if the main program closes
    process.start()  # start the process

outputs:

hello from: 31821
hello from: 31822
hello from: 31823
hello from: 31824

Waiting for Processes to Finish

if you want to wait using Process.join() (more info on process.join() & process.daemon on this SO answer) you can do it like this:

import os
import time
from multiprocessing import Process

def func():
    time.sleep(3)
    print(f'hello from: {os.getpid()}')

processes = [Process(target=func) for _ in range(4)]  # creates 4 processes

for process in processes:
    process.start()  # start the process

for process in processes:
    process.join()  # wait for the process to finish

print('all processes are done!')

this outputs:

hello from: 31980
hello from: 31983
hello from: 31981
hello from: 31982
all processes are done!
ShmulikA
  • 3,468
  • 3
  • 25
  • 40