6
import threading

threads = []
for n in range(0, 60000):
    t = threading.Thread(target=function,args=(x, n))
    t.start()
    threads.append(t)
for t in threads:
    t.join()

It is working well for range up to 800 on my laptop, but if I increase range to more than 800 I get the error can't create new thread.

How can I control number to threads to get created or any other way to make it work like timeout? I tried using threading.BoundedSemaphore function but that doesn't seem to work properly.

Hooked
  • 84,485
  • 43
  • 192
  • 261
Lady
  • 73
  • 1
  • 9
  • 6
    Don't create so many threads at once, of course. There is a relatively finite limit after which more threads will actually hurt performance, and 60k is well above that limit: remember that each thread consumes resources. Thankfully the system prevents this (either via soft or hard limit). Search for "thread pool" - e.g. http://stackoverflow.com/questions/3033952/python-thread-pool-similar-to-the-multiprocessing-pool , http://docs.python.org/3/library/concurrent.futures.html - for one approach to solve this problem. – user2246674 Sep 05 '13 at 22:55
  • 6
    This smells very strongly of an [XY problem](http://meta.stackexchange.com/questions/66377/what-is-the-xy-problem). Why do you want (or think you want) 60000 threads? What are you trying to do? The right answer may be "use a threadpool of 8 threads with 60000 tasks", or "use 8 threads that each do a numpy vector operation on 7500 values", or "use greenlets/coroutines/explicit cooperative threads", or "there is no benefit to parallelizing this code in the first place", but it's almost certainly not going to be a direct answer to your question. – abarnert Sep 05 '13 at 23:15
  • Also, in the future, don't just describe the error; post the actual traceback. In this case, people can guess what happened, but in general, it's better if you don't make us guess. Similarly, don't just say "I tried using threading.BoundedSemaphore function but that doesn't seem to work properly"; show the code you tried, and what it did wrong. – abarnert Sep 06 '13 at 00:52
  • @abarnert but thats decreasing speed damn a lot – Lady Sep 06 '13 at 08:00
  • @Lady: I'm not sure what "that" you're referring to as decreasing speed a lot. But running more threads than you have cores definitely decreases speed far more than anything else you might consider. There's scheduler overhead (which is super-linear on most platforms), context switching, lost cache/paging/DMA benefits, etc., all of which waste time, and there's no compensating benefit at all. – abarnert Sep 06 '13 at 17:57
  • @Lady: Well, for I/O-bound code, on many platforms, there are ways to cooperate nicely with the scheduler. For example, on Windows, if you have, say, 8 completion ports and 8 threads waiting on each, the kernel will always wake an appropriate thread to handle a request as it comes in, and there's no way you could do that without scheduler help. But you're not going to be able to take advantage of features like that in cross-platform Python code (except maybe by using server framework libraries that handle all the details for you). – abarnert Sep 06 '13 at 18:00
  • well, i have i5 with 8 gb ram windows ultimate. but i want to make it something like that it should work on other platform as ell. as i will add it in github for others use too. – Lady Sep 06 '13 at 19:02
  • @Lady: An i5 has 2 to 4 physical cores; a handful of the models are hyperthreaded, so you may have up to 8 virtual cores. So, running with up to 8 threads or processes may speed things up, but anything beyond that will not. And "Windows Ultimate" tells me nothing about whether it's XP, Vista, 7, or 8, or whether it's 32- or 64-bit, so that isn't very useful information. And the amount of physical RAM only matters if you're running into swap thrashing, which you probably aren't—if you're using 32-bit Windows or 32-bit Python, the 2GB virtual memory space will hit you long before that. – abarnert Sep 06 '13 at 20:59

1 Answers1

18

The problem is that no major platform (as of mid-2013) will let you create anywhere near this number of threads. There are a wide variety of different limitations you could run into, and without knowing your platform, its configuration, and the exact error you got, it's impossible to know which one you ran into. But here are two examples:

  • On 32-bit Windows, the default thread stack is 1MB, and all of your thread stacks have to fit into the same 2GB of virtual memory space as everything else in your program, so you will run out long before 60000.
  • On 64-bit linux, you will likely exhaust one of your session's soft ulimit values before you get anywhere near running out of page space. (Linux has a variety of different limits beyond the ones required by POSIX.)

So, how can i control number to threads to get created or any other way to make it work like timeout or whatever?

Using as many threads as possible is very unlikely to be what you actually want to do. Running 800 threads on an 8-core machine means that you're spending a whole lot of time context-switching between the threads, and the cache keeps getting flushed before it ever gets primed, and so on.

Most likely, what you really want is one of the following:

  • One thread per CPU, serving a pool of 60000 tasks.
    • Maybe processes instead of threads (if the primary work is in Python, or in C code that doesn't explicitly release the GIL).
    • Maybe a fixed number of threads (e.g., a web browsers may do, say, 12 concurrent requests at a time, whether you have 1 core or 64).
    • Maybe a pool of, say, 600 batches of 100 tasks apiece, instead of 60000 single tasks.
  • 60000 cooperatively-scheduled fibers/greenlets/microthreads all sharing one real thread.
    • Maybe explicit coroutines instead of a scheduler.
    • Or "magic" cooperative greenlets via, e.g. gevent.
    • Maybe one thread per CPU, each running 1/Nth of the fibers.

But it's certainly possible.

Once you've hit whichever limit you're hitting, it's very likely that trying again will fail until a thread has finished its job and been joined, and it's pretty likely that trying again will succeed after that happens. So, given that you're apparently getting an exception, you could handle this the same way as anything else in Python: with a try/except block. For example, something like this:

threads = []
for n in range(0, 60000):
    while True:
        t = threading.Thread(target=function,args=(x, n))
        try:
            t.start()
            threads.append(t)
        except WhateverTheExceptionIs as e:
            if threads:
                threads[0].join()
                del threads[0]
            else:
                raise
        else:
            break
for t in threads:
    t.join()

Of course this assumes that the first task launched is likely to be the one of the first tasks finished. If this is not true, you'll need some way to explicitly signal doneness (condition, semaphore, queue, etc.), or you'll need to use some lower-level (platform-specific) library that gives you a way to wait on a whole list until at least one thread is finished.

Also, note that on some platforms (e.g., Windows XP), you can get bizarre behavior just getting near the limits.


On top of being a lot better, doing the right thing will probably be a lot simpler as well. For example, here's a process-per-CPU pool:

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    concurrent.futures.wait(fs)

… and a fixed-thread-count pool:

with concurrent.futures.ThreadPoolExecutor(12) as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    concurrent.futures.wait(fs)

… and a balancing-CPU-parallelism-with-numpy-vectorization batching pool:

with concurrent.futures.ThreadPoolExecutor() as executor:
    batchsize = 60000 // os.cpu_count()
    fs = [executor.submit(np.vector_function, x, 
                          np.arange(n, min(n+batchsize, 60000)))
          for n in range(0, 60000, batchsize)]
    concurrent.futures.wait(fs)

In the examples above, I used a list comprehension to submit all of the jobs and gather their futures, because we're not doing anything else inside the loop. But from your comments, it sounds like you do have other stuff you want to do inside the loop. So, let's convert it back into an explicit for statement:

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = []
    for n in range(60000):
        fs.append(executor.submit(function, x, n))
    concurrent.futures.wait(fs)

And now, whatever you want to add inside that loop, you can.


However, I don't think you actually want to add anything inside that loop. The loop just submits all the jobs as fast as possible; it's the wait function that sits around waiting for them all to finish, and it's probably there that you want to exit early.

To do this, you can use wait with the FIRST_COMPLETED flag, but it's much simpler to use as_completed.

Also, I'm assuming error is some kind of value that gets set by the tasks. In that case, you will need to put a Lock around it, as with any other mutable value shared between threads. (This is one place where there's slightly more than a one-line difference between a ProcessPoolExecutor and a ThreadPoolExecutor—if you use processes, you need multiprocessing.Lock instead of threading.Lock.)

So:

error_lock = threading.Lock
error = []

def function(x, n):
    # blah blah
    try:
        # blah blah
    except Exception as e:
        with error_lock:
            error.append(e)
    # blah blah

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    for f in concurrent.futures.as_completed(fs):
        do_something_with(f.result())
        with error_lock:
            if len(error) > 1: exit()

However, you might want to consider a different design. In general, if you can avoid sharing between threads, your life gets a lot easier. And futures are designed to make that easy, by letting you return a value or raise an exception, just like a regular function call. That f.result() will give you the returned value or raise the raised exception. So, you can rewrite that code as:

def function(x, n):
    # blah blah
    # don't bother to catch exceptions here, let them propagate out

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    error = []
    for f in concurrent.futures.as_completed(fs):
        try:
            result = f.result()
        except Exception as e:
            error.append(e)
            if len(error) > 1: exit()
        else:
            do_something_with(result)

Notice how similar this looks to the ThreadPoolExecutor Example in the docs. This simple pattern is enough to handle almost anything without locks, as long as the tasks don't need to interact with each other.

abarnert
  • 354,177
  • 51
  • 601
  • 671
  • Your first code solution, in which you used error handling to handle error. That work - i had done that before too. But problem is that after reaching maximum number of threads that can get created by process, speed get a lot slow. like same threads takes 0.02 second to complete task of range(0, 1000) and but for range(0, 60000) it takes almost 80 seconds to complete the task which is a lot more if i compare it with speed of range(0, 1000).while it should not take more than 1.5 sec to complete task at previous speed. Also, output simply shows it too that speed gets a lot slow after 1000 – Lady Sep 06 '13 at 07:52
  • process pool executer is not working for me. After executing program, there is no output at all. It just keeps waiting and waiting. For thread pool executer - yes that worked but again same problem as with first solution.Its taking damn a lot of time - which it shouldn't, even after giving 800 to ThreadPoolExecutor. – Lady Sep 06 '13 at 07:54
  • for 4th solution what is np there? also, a parenthesis is missing in third line. np is raising NameError i.e, its not defined. Does it require to declare it as variable before somewhere or what? Making program compatible with python 2.7 not 3.x. Also, for concurrent i got it for python 2.7 ! – Lady Sep 06 '13 at 07:58
  • 1
    @Lady: For the try/except solution, I explained why that will be very slow, and other reasons it's a bad idea. The details of how it slows down depend heavily on your platform. For example, with 32-bit Windows XPSP1, the first 1000 or so threads can be created very quickly, but then they get slower and slower, and around 1024 you finally get a failure that can take 15 seconds. With 64-bit OS X 10.8, the stack space is pegged, so on a 2GB machine, 1000 threads could easily be enough to cause swap thrashing. And so on. – abarnert Sep 06 '13 at 18:03
  • @Lady: For the executor solutions, I can't debug your code unless I can see it. But if you read [the docs](http://docs.python.org/dev/library/concurrent.futures.html), there are some simple examples that should help. (Meanwhile, it looks like you figured this out yourself, but for future readers, to use Python 2.7, you need to install [the backport](https://pypi.python.org/pypi/futures) from PyPI.) Also, you do not want to give 800 to `ThreadPoolExecutor`; again, you almost certainly want either the default, or a smallish static number like 12. – abarnert Sep 06 '13 at 18:05
  • @Lady: For the 4th solution: Most people who use [numpy](http://numpy.org) start off with `import numpy as np`. So, you need to install and import it. If the tasks you're trying to do are numerical or mathematical, you definitely want to look into numpy (and scipy and its other cousins); a numpy vector expression like `a *= 2` is typically about 10x as fast as a pure-Python `a = [x*2 for x in a]`. But if it's not appropriate for your use case, don't worry about it. – abarnert Sep 06 '13 at 18:09
  • i am gonna use ThreadPoolExecutor function as i find that is kinda good for my work. But i want a condition to work b/w that. `with concurrent.futures.ThreadPoolExecutor(12) as executor: fs = [executor.submit(function, x, n) for n in range(60000)] concurrent.futures.wait(fs)` but don't now how to put that exactly in this. i want a condition to work just after for loop in above code that you had given. to check like : `for n in range (0, 6000): if len(error) > 1: exit() #exit the program` how to put this condition in that thread code. – Lady Sep 07 '13 at 10:37
  • @Lady: A list comprehension is just shorthand for a loop; I'll edit the answer to show how to break it out so you can add more stuff into the middle of it. – abarnert Sep 09 '13 at 17:40
  • @Lady: … But after reading what you want more carefully, I don't think what you ask for is actually what you want. See the edited answer for details. – abarnert Sep 09 '13 at 18:02