34

I am using python 2.7, I have some code that looks like this:

task1()
task2()
task3()
dependent1()

task4()
task5()
task6()
dependent2()

dependent3()

The only dependencies here are as follows: dependent1 needs to wait for tasks1-3, dependent2 needs to wait for tasks 4-6 and dependent3 needs to wait for dependents1-2... The following would be okay: running the whole 6 tasks first in parallel, then the first two dependents in parallel.. then the final dependent

I prefer to have as much tasks as possible running in parallel, I've googled for some modules but I was hoping to avoid external libraries, and not sure how the Queue-Thread technique can solve my problem (maybe someone can recommend a good resource?)

Mohamed Khamis
  • 7,731
  • 10
  • 38
  • 58
  • I recommend using the standard library's *Queue.task_done* and *Queue.join* methods to synchronize the threads. At the bottom of the page in the Queue docs, you'll find an example of how to wait for other threads to finish their tasks: http://docs.python.org/library/queue.html#Queue.Queue.join – Raymond Hettinger Nov 23 '11 at 12:30
  • If your code gets more complex, it's worth looking at external libraries, because there are already things to deal with running tasks in parallel while making sure dependencies run in order. – Thomas K Nov 23 '11 at 13:25
  • Because of the GIL, the threads will only run one at a time in standard python. Future versions of Pypy using STM may get round this though. – Stuart Axon Nov 30 '15 at 10:39

3 Answers3

41

The builtin threading.Thread class offers all you need: start to start a new thread and join to wait for the end of a thread.

import threading

def task1():
    pass
def task2():
    pass
def task3():
    pass
def task4():
    pass
def task5():
    pass
def task6():
    pass

def dep1():
    t1 = threading.Thread(target=task1)
    t2 = threading.Thread(target=task2)
    t3 = threading.Thread(target=task3)

    t1.start()
    t2.start()
    t3.start()

    t1.join()
    t2.join()
    t3.join()

def  dep2():
    t4 = threading.Thread(target=task4)
    t5 = threading.Thread(target=task5)

    t4.start()
    t5.start()

    t4.join()
    t5.join()

def dep3():
    d1 = threading.Thread(target=dep1)
    d2 = threading.Thread(target=dep2)

    d1.start()
    d2.start()

    d1.join()
    d2.join()

d3 = threading.Thread(target=dep3)
d3.start()
d3.join()

Alternatively to join you can use Queue.join to wait for the threads end.

gecco
  • 17,969
  • 11
  • 51
  • 68
  • 1
    This is great! but my task functions return values that I use in the dep functions, how do I get the returned values from t1,t2,t3 etc.. ? – Mohamed Khamis Nov 23 '11 at 13:26
  • 3
    First hit in SO gives me http://stackoverflow.com/questions/1886090/return-value-from-thread – gecco Nov 23 '11 at 14:06
  • What if I want to pass some arguments into the function? – Lavish Apr 29 '15 at 15:17
  • You can pass args and kwargs to the threading.Thread. Please have a look at the documentation: https://docs.python.org/3.4/library/threading.html#threading.Thread – gecco Apr 30 '15 at 11:03
6

If you are willing to give external libraries a shot, you can express tasks and their dependencies elegantly with Ray. This works well on a single machine, the advantage here is that parallelism and dependencies can be easier to express with Ray than with python multiprocessing and it doesn't have the GIL (global interpreter lock) problem that often prevents multithreading from working efficiently. In addition it is very easy to scale the workload up on a cluster if you need to in the future.

The solution looks like this:

import ray

ray.init()

@ray.remote
def task1():
    pass

@ray.remote
def task2():
    pass

@ray.remote
def task3():
    pass

@ray.remote
def dependent1(x1, x2, x3):
    pass

@ray.remote
def task4():
    pass

@ray.remote
def task5():
    pass

@ray.remote
def task6():
    pass

@ray.remote
def dependent2(x1, x2, x3):
    pass

@ray.remote
def dependent3(x, y):
    pass

id1 = task1.remote()
id2 = task2.remote()
id3 = task3.remote()

dependent_id1 = dependent1.remote(id1, id2, id3)

id4 = task4.remote()
id5 = task5.remote()
id6 = task6.remote()

dependent_id2 = dependent2.remote(id4, id5, id6)

dependent_id3 = dependent3.remote(dependent_id1, dependent_id2)

ray.get(dependent_id3) # This is optional, you can get the results if the tasks return an object

You can also pass actual python objects between the tasks by using the arguments inside of the tasks and returning the results (for example saying "return value" instead of the "pass" above).

Using "pip install ray" the above code works out of the box on a single machine, and it is also easy to parallelize applications on a cluster, either in the cloud or your own custom cluster, see https://ray.readthedocs.io/en/latest/autoscaling.html and https://ray.readthedocs.io/en/latest/using-ray-on-a-cluster.html). That might come in handy if your workload grows later on.

Disclaimer: I'm one of the developers of Ray.

Philipp Moritz
  • 241
  • 3
  • 3
0

Look at Gevent.

Example Usage:

import gevent
from gevent import socket

def destination(jobs):
    gevent.joinall(jobs, timeout=2)
    print [job.value for job in jobs]

def task1():
    return gevent.spawn(socket.gethostbyname, 'www.google.com')

def task2():
    return gevent.spawn(socket.gethostbyname, 'www.example.com')

def task3():
    return gevent.spawn(socket.gethostbyname, 'www.python.org')

jobs = []
jobs.append(task1())
jobs.append(task2())
jobs.append(task3())
destination(jobs)

Hope, this is what you have been looking for.

mac
  • 42,153
  • 26
  • 121
  • 131
meson10
  • 1,934
  • 14
  • 21
  • 5
    Really? The OP asked for a multithreading solution using a Queue/Thread technique and wanted to avoid external libraries. But you point him to a nest of external dependencies and ignore basic solutions provided by the standard library. – Raymond Hettinger Nov 23 '11 at 12:10