2

In a python script, I have a large dataset that I would like to apply multiple functions to. The functions are responsible for creating certain outputs that get saved to the hard drive.

A few things of note:

  1. the functions are independent
  2. none of the functions return anything
  3. the functions will take variable amounts of time
  4. some of the functions may fail, and that is fine

Can I multiprocess this in any way that each function and the dataset are sent separately to a core and run there? This way I do not need the first function to finish before the second one can kick off? There is no need for them to be sequentially dependent. Thanks!

KidMcC
  • 486
  • 2
  • 7
  • 17

2 Answers2

1

Since your functions are independent and only read data, as long as it is not an issue if your data is modified during the execution of a function, then they are also thread safe.

Use a thread pool (click) . You would have to create a task per function you want to run.

Note: In order for it to run on more than one core you must use Python Multiprocessing. Else all the threads will run on a single core. This happens because Python has a Global Interpreter Lock (GIL). For more information Python threads all executing on a single core

Alternatively, you could use DASK , which augments the data in order to run some multi threading. While adding some overhead, it might be quicker for your needs.

Attersson
  • 4,755
  • 1
  • 15
  • 29
0

I was in a similar situation as yours, and used Processes with the following function:

import multiprocessing as mp

def launch_proc(nproc, lst_functions, lst_args, lst_kwargs):
    n = len(lst_functions)
    r = 1 if n % nproc > 0 else 0
    for b in range(n//nproc + r):
        bucket = []
        for p in range(nproc):
             i = b*nproc + p
             if i == n:
                 break
             proc = mp.Process(target=lst_functions[i], args=lst_args[i], kwargs=lst_kwargs[i])
             bucket.append(proc)
        for proc in bucket:
            proc.start()
        for proc in bucket:
            proc.join()

This has a major drawback: all Processes in a bucket have to finish before a new bucket can start. I tried to use a JoinableQueue to avoid this, but could not make it work.

Example:

def f(i):
    print(i)

nproc = 2
n     = 11
lst_f      = [f] * n
lst_args   = [[i] for i in range(n)]
lst_kwargs = [{}] * n
launch_proc(nproc, lst_f, lst_args, lst_kwargs)

Hope it can help.

Rémi.B
  • 183
  • 10
  • I will have a separate problem later where the JoinableQueue has been utilized in the past. So it's very useful to see the application here as you explain. Thanks for this! – KidMcC May 14 '18 at 12:38