187

I researched first and couldn't find an answer to my question. I am trying to run multiple functions in parallel in Python.

I have something like this:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

I want to call func1 and func2 and have them run at the same time. The functions do not interact with each other or on the same object. Right now I have to wait for func1 to finish before func2 to start. How do I do something like below:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

I want to be able to create both directories pretty close to the same time because every min I am counting how many files are being created. If the directory isn't there it will throw off my timing.

martineau
  • 119,623
  • 25
  • 170
  • 301
lmcadory
  • 2,149
  • 3
  • 14
  • 15
  • 1
    You might want to re-architect this; if you are counting the number of files/folders every minute, you are creating a race condition. What about having each function update a counter, or use a lockfile to ensure that the periodic process doesn't update the count until both functions have finished executing? –  Aug 26 '11 at 17:02

8 Answers8

262

You could use threading or multiprocessing.

Due to peculiarities of CPython, threading is unlikely to achieve true parallelism. For this reason, multiprocessing is generally a better bet.

Here is a complete example:

from multiprocessing import Process


def func1():
    print("func1: starting")
    for i in range(10000000):
        pass

    print("func1: finishing")


def func2():
    print("func2: starting")
    for i in range(10000000):
        pass

    print("func2: finishing")


if __name__ == "__main__":
    p1 = Process(target=func1)
    p1.start()
    p2 = Process(target=func2)
    p2.start()
    p1.join()
    p2.join()

The mechanics of starting/joining child processes can easily be encapsulated into a function along the lines of your runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)
alper
  • 2,919
  • 9
  • 53
  • 102
NPE
  • 486,780
  • 108
  • 951
  • 1,012
  • 8
    I used your code but the functions still didn't start at the same time. – lmcadory Aug 26 '11 at 16:26
  • 5
    @Lamar McAdory: Please explain what exactly you mean by "at the same time", perhaps giving a concrete example of what you did, what you were expecting to happen, and what actually happened. – NPE Aug 26 '11 at 16:30
  • 9
    @Lamar: You can never have any guarantee of "exactly the same time" and thinking you can is just plain wrong. Depending on how many cpus you have, the load of the machine, the timing of many things happening on the computer will all have an influence on the time the threads/process start. Also, since the processes are started right after creation, the overhead of creating a process also has to be calculated in the time difference you see. – Martin Aug 26 '11 at 16:41
  • 1
    @Lamar McAdory: There is no way to ensure perfect synchronicity of the execution of two functions. Perhaps it is worth re-evaluating the overall approach to see if there's a better way to achieve what you're trying to do. – NPE Aug 26 '11 at 20:02
  • 1
    How do I stop one of the subprecesses? For example: func1 ended with exitcode0, but func2 still running. What should I do to stop the second process? – falek.marcin Nov 11 '11 at 18:52
  • 1
    is it possible to get a list of the results of each function? let say each function return a different value, can the values be append to some list that can be use later? maybe appending the result to a global list ? – pelos Jul 11 '17 at 16:07
  • @NPE, is there a way to monitor the processes to be sure that they are running? – user3437245 Mar 20 '18 at 08:28
  • @NOE suppose that both functions returns values. How can we recuperate in `runInParallel(*fns):` – LearnToGrow Oct 04 '18 at 19:02
  • what is xrange? In which library do I get it? @lmcadory – Trect Oct 29 '18 at 14:20
  • @NPE, what if two function return value? How to get those returned value? – Om Prakash Jul 26 '19 at 06:44
  • @OmPrakash, See: https://stackoverflow.com/questions/10415028/how-can-i-recover-the-return-value-of-a-function-passed-to-multiprocessing-proce – Aaron Oct 23 '19 at 10:52
  • 6
    If my functions take parameters and when i pass parameters while calling them from separate processes, they dont run simultaneously. Can you please help – user2910372 Apr 16 '20 at 03:44
  • 2
    What if the function has parameters? – Tech Expert Wizard Aug 30 '21 at 23:29
  • Is there anyway to use this solution by `start()` a `Process` with a schedule job from `apscheduler`? Suppose I have this code: `scheduler.add_job(foo, 'interval', seconds=60, id='1')` how can I use process in this example? – Guilherme Matheus Jan 15 '22 at 20:52
  • Note, multiprocessing works only if, as here, the function(s) executed in parallel are pickle-able. For more complex/external functions (e.g. subprocess), using Ray is a simple solution that works. – FlorianH Jun 15 '23 at 21:27
50

If your functions are mainly doing I/O work (and less CPU work) and you have Python 3.2+, you can use a ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

If your functions are mainly doing CPU work (and less I/O work) and you have Python 3.2+, you can use a ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor

def run_cpu_tasks_in_parallel(tasks):
    with ProcessPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])

Alternatively if you only have Python 2.6+, you can use the multiprocessing module directly:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])
David Foster
  • 6,931
  • 4
  • 41
  • 42
  • This is a good answer. How to identify from the result for the I/O bound tasks using concurrent.futures which one completed ? Basically instead of lamba functions if we have normal functions, how to identify the result mapped to the called function ? – Tragaknight Jul 15 '19 at 00:33
  • Nevermind I found a way - instead of this run_cpu_tasks_in_parallel([ lambda: print('CPU task 1 running!'), lambda: print('CPU task 2 running!'), ]) use this - results = run_io_tasks_in_parallel([ lambda: {'is_something1': func1()}, lambda: {'is_something2': func2()}, ]) – Tragaknight Jul 15 '19 at 01:00
  • If the function gives outputs for different parameters, how to save them. Indeed, what should be placed in place of ````lambda: print('CPU task 1 running!'), lambda: print('CPU task 2 running!'),```` to append the results to variables ````task1_output```` and ````task2_output```` – mArk Sep 11 '20 at 06:55
  • For the second example (of doing CPU work) you can also use a ProcessPoolExecutor, which has a similar API as the first example. https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor – David Foster Apr 25 '22 at 23:47
30

This can be done elegantly with Ray, a system that allows you to easily parallelize and distribute your Python code.

To parallelize your example, you'd need to define your functions with the @ray.remote decorator, and then invoke them with .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

If you pass the same argument to both functions and the argument is large, a more efficient way to do this is using ray.put(). This avoids the large argument to be serialized twice and to create two memory copies of it:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Important - If func1() and func2() return results, you need to rewrite the code as follows:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

There are a number of advantages of using Ray over the multiprocessing module. In particular, the same code will run on a single machine as well as on a cluster of machines. For more advantages of Ray see this related post.

jimh
  • 1,651
  • 2
  • 15
  • 28
Ion Stoica
  • 797
  • 9
  • 7
  • 1
    I found this to be the superior option. One thing I'd add to this, especially if you use it in docker, is that it is architecture dependent. As of this moment, does not work on alpine linux (Centos 7 worked for me) and you should run `ray.shutdown()` after running this because you will run out of memory swiftly if what you are doing is complex in any way. – jimh Sep 22 '20 at 08:09
  • Just to avoid others getting confused as I did, `ray.shutdown()` that @jimh mentions is NOT required at the end of a simple ray-using script. From `ray.shutdown()`: "This will automatically run at the end when a Python process that uses Ray exits.". If you still require `ray.shutdown()`, make sure you wait for ALL sub-processes to have finished (not just the first one, as can happen in basic code). – FlorianH Jun 15 '23 at 20:29
  • I see what you mean. I am specifically referring to deployed instances of Python that may run indefinitely, to further support their point. – jimh Jun 16 '23 at 21:11
17

Seems like you have a single function that you need to call on two different parameters. This can be elegantly done using a combination of concurrent.futures and map with Python 3.2+

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

Now, if your operation is IO bound, then you can use the ThreadPoolExecutor as such:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Note how map is used here to map your function to the list of arguments.

Now, If your function is CPU bound, then you can use ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

If you are not sure, you can simply try both and see which one gives you better results.

Finally, if you are looking to print out your results, you can simply do this:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)
BICube
  • 4,451
  • 1
  • 23
  • 44
8

In 2021 the easiest way is to use asyncio:

import asyncio, time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():

    task1 = asyncio.create_task(
        say_after(4, 'hello'))

    task2 = asyncio.create_task(
        say_after(3, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

References:

[1] https://docs.python.org/3/library/asyncio-task.html

Konstantin Burlachenko
  • 5,233
  • 2
  • 41
  • 40
  • 13
    wrong answer. Asyncio is NOT for parallelism – user3786340 May 04 '21 at 16:02
  • wrong comment. It IS for parallelism across the tasks. Is is mapped into real thread it's hard to say because the specification (there is no standard for languages and by spec. I mean text from the link) does not say that. If in you version task is not the same as thread create own threads with threading.Thread. – Konstantin Burlachenko May 05 '21 at 00:12
  • 4
    If I am not wrong, This is not true parallelism. Asyncio will use blocking time to run another task.. so, at a time. there is only one task that is executing – hackwithharsha Jul 06 '21 at 10:58
  • 2
    @user3786340 is right, you can see the argument in this post here : https://towardsdatascience.com/concurrency-and-parallelism-in-python-bbd7af8c6625 It says : "But the idea behind the asyncio’s tasks is different from the threads. In fact, tasks run on a single thread. However, each task allows the OS to run another task if the first one is waiting for its response instead of blocking it. That’s the essence of the asynchronous IO. (A more thorough walk-through for an asynchronous program in a later article)." – Jerbs Nov 19 '21 at 13:43
  • I also think so, I have recently been working on a algo where I am calculating Fractional Gaussian Noise on 100 examples of data each with 1024 as their dimension. And if I make my code async then there is NO visible performance gain. The reason being the function which calculates the FGN takes equal time to run for all 100 cases and it is not waiting for I/O. If your code is waiting for I/O then it is a good idea to use async-await otherwise I am pretty confident it does not serve the purpose. – SRC Dec 31 '21 at 09:55
  • It is wrong answer. parallelism != asynchronous – illuminato Feb 16 '22 at 21:19
  • "asynchronous >= parallelism" if asynchronous implemented as a thread pool with a fixed number of threads and size of thread pool is tunable. Actually, creating true parallelism in terms of userspace threads maybe a bit tricky in Python interpreter due to GIL. – Konstantin Burlachenko Feb 16 '22 at 22:35
  • @SRC it depends on how tasks are implemented. Typically it's a thread pool, but even in C++ in the standard library, there is no way to control the size of a thread pool. Also, there is a problem with Threading in Python script language due to GIL: https://speakerdeck.com/dabeaz/embracing-the-global-interpreter-lock . In terms of real userspace threads - one thread should wait on some system call and only, in that case, there is true parallelism. If you launch 12 threads and all of them are CPU bound - there is no way to use them in parallel, but conceptually you can leverage task or "threads" – Konstantin Burlachenko Feb 16 '22 at 22:44
  • Notice the tasks in the question above. Its got a lot of I/O. Disk access isn't as slow as it used to be but its still has wait states of many cpu cycles. that makes it well suited for using asyncio. Yes, it wouldn't help with a compute intensive task set. But it would actually succeed. I'm curious though, if Python running on a multi-core processor will spread the work across the processors? – Lee Meador Mar 24 '22 at 17:25
5

If you are a windows user and using python 3, then this post will help you to do parallel programming in python.when you run a usual multiprocessing library's pool programming, you will get an error regarding the main function in your program. This is because the fact that windows has no fork() functionality. The below post is giving a solution to the mentioned problem .

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Since I was using the python 3, I changed the program a little like this:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

After this function , the above problem code is also changed a little like this:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

And I got the output as :

[1, 8, 27, 64, 125, 216]

I am thinking that this post may be useful for some of the windows users.

Arun Sooraj
  • 737
  • 9
  • 20
4

There's no way to guarantee that two functions will execute in sync with each other which seems to be what you want to do.

The best you can do is to split up the function into several steps, then wait for both to finish at critical synchronization points using Process.join like @aix's answer mentions.

This is better than time.sleep(10) because you can't guarantee exact timings. With explicitly waiting, you're saying that the functions must be done executing that step before moving to the next, instead of assuming it will be done within 10ms which isn't guaranteed based on what else is going on on the machine.

Davy8
  • 30,868
  • 25
  • 115
  • 173
3

(about How can I simultaneously run two (or more) functions in python?)

With asyncio, sync/async tasks could be run concurrently by:

import asyncio
import time

def function1():
    # performing blocking tasks
    while True:
        print("function 1: blocking task ...")
        time.sleep(1)

async def function2():
    # perform non-blocking tasks
    while True:
        print("function 2: non-blocking task ...")
        await asyncio.sleep(1)

async def main():
    loop = asyncio.get_running_loop()

    await asyncio.gather(
        # https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
        loop.run_in_executor(None, function1),
        function2(),
    )

if __name__ == '__main__':
    asyncio.run(main())

koyeung
  • 134
  • 1
  • 2