1

I would like to define a do_in_parallel function in python that will take in functions with arguments, make a thread for each and perform them in parallel. The function should work as so:

do_in_parallel(_sleep(3), _sleep(8), _sleep(3))

I am however having a hard time defining the do_in_parallel function to take multiple functions with multiple arguments each, here's my attempt:

from time import sleep
import threading

def do_in_parallel(*kwargs):

    tasks = []

    for func in kwargs.keys():
        t = threading.Thread(target=func, args=(arg for arg in kwargs[func]))
        t.start()
        tasks.append(t)

    for task in tasks:        
        task.join()

def _sleep(n):
    sleep(n)
    print('slept', n)

Using it as so, and getting the following error:

do_in_parallel(_sleep=3, _sleep=8, _sleep=3)

>> do_in_parallel(sleepX=3, sleepX=8, sleepX=3)
                            ^
>> SyntaxError: keyword argument repeated

Can someone explain what I would need to change in my function so that it can take multiple function parameters as so:

do_in_parallel(_sleep(3), _sleep(8), maybe_do_something(else, and_else))
Darkonaut
  • 20,186
  • 7
  • 54
  • 65
callmeGuy
  • 944
  • 2
  • 11
  • 28

1 Answers1

1

do_in_parallel(_sleep(3), _sleep(8), maybe_do_something(else, and_else))

This call structure wouldn't work anyway since you are passing the results of your target functions to do_in_parallel (you are already calling _sleep etc.).

What you need to do instead, is bundle up tasks and pass these tasks to your processing function. A task here is a tuple, containing the target function to be called and an argument-tuple task = (_sleep, (n,)).

I suggest you then use a ThreadPool and the apply_async method to process the separate tasks.

from time import sleep
from multiprocessing.dummy import Pool  # .dummy.Pool is a ThreadPool


def _sleep(n):
    sleep(n)
    result = f'slept {n}'
    print(result)
    return result


def _add(a, b):
    result = a + b
    print(result)
    return result


def do_threaded(tasks):
    with Pool(len(tasks)) as pool:
        results = [pool.apply_async(*t) for t in tasks]
        results = [res.get() for res in results]
    return results


if __name__ == '__main__':

    tasks = [(_sleep, (i,)) for i in [3, 8, 3]]
    # [(<function _sleep at 0x7f035f844ea0>, (3,)),
    #  (<function _sleep at 0x7f035f844ea0>, (8,)),
    #  (<function _sleep at 0x7f035f844ea0>, (3,))]
    tasks += [(_add, (a, b)) for a, b in zip(range(0, 3), range(10, 13))]

    print(do_threaded(tasks))

Output:

10
12
14
slept 3
slept 3
slept 8
['slept 3', 'slept 8', 'slept 3', 10, 12, 14]

Process finished with exit code 0
Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • just making sure, "from multiprocessing.dummy import Pool " this is multithreading although it's from the multiprocessing package? – callmeGuy Oct 25 '18 at 15:07
  • @callmeGuy exactly. it's "dummy" because it uses threads instead of processes but otherwise offers the same methods. It's a wrapper for `multiprocessing.pool.ThreadPool`, you could directly import the latter if you prefer. – Darkonaut Oct 25 '18 at 15:14
  • that's cool, thanks. Do you reckon I can add functions that read and write to and from pandas df? Like will I need any locks? – callmeGuy Oct 25 '18 at 15:42
  • 1
    @callmeGuy If you would need locks to keep data consistent depends on what you are doing exactly, that's not a question one could answer for the general case. If you think about this keep in mind, that a thread can be interrupted at any time while performing non-atomic operations. So you would need locks in cases where you need transactions to be non-interruptible. – Darkonaut Oct 25 '18 at 15:54