1

Say I have a thread pool executor with max. 10 threads, and I submit a task to it which itself creates another task and in turn waits for it to complete, recursively until I reach a depth of 11.

Example code in Python:

import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        f = e.submit(task, depth + 1)
        concurrent.futures.wait([f])


f = e.submit(task, 0)
print f.result()

The above code outputs:

started depth 0
started depth 1
started depth 2
started depth 3
started depth 4
started depth 5
started depth 6
started depth 7
started depth 8
started depth 9

and deadlocks.

Is there any way to solve this problem without creating additional threads and executors?

In other words, a way for the worker threads to work on other tasks while waiting?

saarraz1
  • 2,999
  • 6
  • 29
  • 44
  • http://stackoverflow.com/questions/1239035/asynchronous-method-call-in-python – Tymoteusz Paul May 24 '15 at 18:59
  • Or better yet go with high level solution like Celery http://www.celeryproject.org/ – Tymoteusz Paul May 24 '15 at 19:06
  • Doesn't it deadlock because it keeps calling new threads, but the maximum number in the pool is only 10. None of the threads ever finish their task. – Alexander May 24 '15 at 19:18
  • @Alexander - that's right, but none of them are working either, they're all sleeping when there's work to be done. I'm looking for a way to make them quit waiting and go do other tasks so that the operation may complete. – saarraz1 May 24 '15 at 19:22
  • 1
    Short answer - no. At least not using an executor. However, you might want to think about refactoring your code to use coroutines instead. https://docs.python.org/3/library/asyncio-task.html – Dunes May 24 '15 at 19:27
  • @Dunes That's it! coroutines is the exact kind of facility I was looking for! Please post this as an answer so I can accept it. – saarraz1 May 24 '15 at 19:38

3 Answers3

3

Using coroutines your code could be rewritten as:

import asyncio

@asyncio.coroutine
def task(depth):
    print('started depth %d' % (depth, ))
    if depth > 10:
        return depth
    else:
        # create new task
        t = asyncio.async(task(depth + 1))
        # wait for task to complete
        yield from t
        # get the result of the task
        return t.result()

loop = asyncio.get_event_loop()
result = loop.run_until_complete(task(1))
print(result)
loop.close()

However, I'm struggling to see why you need all this extra code. In your example code you always wait directly for the result of the task, thus your code would run no different without the executor. For example, the following would produce the same result

def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        task(depth + 1)

I think this example from the documentation better shows how async coroutines are able to parallelise tasks. This example creates 3 tasks, each of which computes a different factorial. Notice how when each task yields to another coroutine (in this case async.sleep), another task is allowed to continue its execution.

import asyncio

@asyncio.coroutine
def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        yield from asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
tasks = [
    asyncio.ensure_future(factorial("A", 2)),
    asyncio.ensure_future(factorial("B", 3)),
    asyncio.ensure_future(factorial("C", 4))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Output:

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
Dunes
  • 37,291
  • 7
  • 81
  • 97
1

No, if you want to avoid a deadlock you can't wait on a future from the same executor in a task.

The only thing you could do in this example is to return the future and then recursively process the results:

import concurrent.futures
import time

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        f = e.submit(task, depth + 1)
        return f


f = e.submit(task, 0)
while isinstance(f.result(), concurrent.futures.Future):
    f = f.result()

print f.result()

However it would be best to avoid such a recursive execution in the first place.

mata
  • 67,110
  • 10
  • 163
  • 162
0

What you're experiencing here, is what you already rightly called a deadlock. The first thread which starts the next thread and waits for it is holding a lock which all subsequent tasks will deadlock on while waiting for the same lock to be released (which is never in your case). I'd suggest that you start your own threads in the tasks instead of using the pool, something like:

import concurrent.futures
import threading


class TaskWrapper(threading.Thread):

    def __init__(self, depth, *args, **kwargs):
        self._depth = depth
        self._result = None
        super(TaskWrapper, self).__init__(*args, **kwargs)

    def run(self):
        self._result = task(self._depth)

    def get(self):
        self.join()
        return self._result

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)


def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        t = TaskWrapper(depth + 1)
        t.start()
        return t.get()

f = e.submit(task, 0)
print f.result()
Community
  • 1
  • 1
sirfz
  • 4,097
  • 23
  • 37