2

I have a script that is producing files from a large dataset, so I'm using multiprocessing to speed things up. The problem I have is that my script accepts several command line arguments using the argparse library which change the results and I'm struggling to pass the command line arguments to the function called by my multiprocessing pool.

I'm sure the solution to this is really simple I'm just not seeing it. I figured I would make a global variable that gets updated to reflect the command line args but my function called by the pool still has the old value. I've tried to illustrate my problem below:

output_dir = 'default'

def do_task(item):
    print(output_dir) # Prints 'default'
    result = process_item(item)
    write_to_file(data=result, location=os.path.join(output_dir, item.name))

def do_multi_threaded_work(data_path):
    print(output_dir) # Prints command line argument
    data = read_from_file(args.input_file)
    pool = multiprocessing.Pool()
    for i, _ in enumerate(pool.imap_unordered(do_task, data):
        print('Completed task %d/%d' % (i, len(data)))

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-o', '--output-dir')
    parser.add_argument('-i', '--input-file')
    args = parser.parse_args()
    output_dir = args.output_dir
    do_multithreaded_work(args.input_file)

How can I ensure that I am saving my files to the correct directory according to the command line arguments?

Edit: It's been suggested I do something like the code below, however considering I have quite a lot of constants (I simplified it to just 1 for this example) in my actual code this seems very messy and counter-intuative. Is there really no better way to just set a global constant accessible by the do_task function, without hard-coding the value?

from itertools import repeat
...
def do_multi_threaded_work(data_path):
    ...
    for i, _ in enumerate(pool.imap_unordered(do_task, zip(data, repeat(output_dir))):
cainy393
  • 422
  • 1
  • 4
  • 16
  • Take a look at https://stackoverflow.com/questions/5442910/python-multiprocessing-pool-map-for-multiple-arguments - I think that might have what you're looking for. – Rusty Widebottom Apr 29 '20 at 18:56
  • @RustyWidebottom I've just had a read through the documentation [here](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap) and I don't think that's what I want. I'm not looking to pass multiple items from the data variable, I have a single constant that I want to pass as well as a single data item. – cainy393 Apr 29 '20 at 19:00
  • Ah, I see in the other answers there is some suggestion on how to acheive what I want to do. Still seems very awkward though, will give it a go. Thanks. – cainy393 Apr 29 '20 at 19:03

2 Answers2

1

If I understood your question in a right way you can do the following to send additional arguments to your function with the main data:

# my toy example:

import multiprocessing as mp


def do_job(x) -> int:
    # x[0] - is a real data # x[1], x[2] imagine the are parameters to tune fuction 
    return x[0]**2 + x[1] + x[2]


if __name__ == '__main__':
    jobs = [1, 2, 3, 4, 5, 6, 7, 8]  # number 0 argument - data you want to process
    # rules to work with jobs - tune parameters
    number_one_arg = 100
    number_two_arg = 2000

    # create structure to accompany data with tune parameters
    x_for_do_job = [(i, number_one_arg, number_two_arg,) for i in jobs]
    print(x_for_do_job) # show what we have now

    pool_ = mp.Pool(4)
    results = pool_.map(do_job, x_for_do_job)
    print(results)
Artiom Kozyrev
  • 3,526
  • 2
  • 13
  • 31
  • Thanks! This looks like it would acheive what I wanted, yes. I ended up going for a different approach though because I'm dealing with such a large number of jobs. I figured it might be a bit of a RAM hog to repeat lots of parameters reduntantly in a big list like that. – cainy393 Apr 29 '20 at 19:33
  • @cainy393 you can check this method ```Pool.starmap``` but it looks almost the same approach to handle the problem: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap – Artiom Kozyrev Apr 29 '20 at 20:04
1

Found a solution that involved using the partial feature of the functools library in the end. This enabled me to specify any constant paramters by creating a partial function with those parameters specified. Then, I pass that partial function along with the iterable to the pool.

from functools import partial

def do_task(output_dir, item):
    print(output_dir) # Prints 'default'
    result = process_item(item)
    write_to_file(data=result, location=os.path.join(output_dir, item.name))

def do_multi_threaded_work(data_path):
    print(output_dir) # Prints command line argument
    data = read_from_file(args.input_file)
    func = partial(do_task, output_dir)
    pool = multiprocessing.Pool()
    for i, _ in enumerate(pool.imap_unordered(func, data):
        print('Completed task %d/%d' % (i, len(data)))

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-o', '--output-dir')
    parser.add_argument('-i', '--input-file')
    args = parser.parse_args()
    output_dir = args.output_dir
    do_multithreaded_work(args.input_file)
cainy393
  • 422
  • 1
  • 4
  • 16