5

I have a list of items aprox 60,000 items - i would like to send queries to the database to check if they exist and if they do return some computed results. I run an ordinary query, while iterating through the list one-by-one, the query has been running for the last 4 days. I thought i could use the threading module to improve on this. I did something like this

if __name__ == '__main__':
  for ra, dec in candidates:
    t = threading.Thread(target=search_sl, args=(ra,dec, q))
    t.start()
  t.join()

I tested with only 10 items and it worked fine - when i submitted the whole list of 60k items, i run into errors i.e, "maximum number of sessions exceeded". What I want to do is to create maybe 10 thread at a time. When the 1st bunch of thread have finished excuting, i send another request and so on.

user739807
  • 855
  • 4
  • 9
  • 14
  • 3
    I don't think threads are the solution to your problem. You should probably rather reduce the number of database queries. Could you post details on the individual queries that you currently perform? – Sven Marnach Apr 08 '12 at 14:04
  • @SvenMarnach,i run the query thru python, another class translates the queries to SQL. – user739807 Apr 08 '12 at 14:29

4 Answers4

7

You could try using a process pool, which is available in the multiprocessing module. Here is the example from the python docs:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers

Try increasing the number of processes until you reach the maximum your system can support.

Mike Chamberlain
  • 39,692
  • 27
  • 110
  • 158
  • 2
    That's not a thread pool, but rather a process pool -- which of course does not make too much of a difference here. – Sven Marnach Apr 08 '12 at 14:05
  • 1
    You can of course do the same thing with threads as Mikey explained with multiprocessing. See: http://stackoverflow.com/questions/3033952/python-thread-pool-similar-to-the-multiprocessing-pool – enticedwanderer Apr 08 '12 at 14:09
4

Improve your queries before threading (premature optimization is the root of all evil!)

Your problem is having 60,000 different queries on a single database. Having a single query for each item means a lot of overhead for opening the connection and invoking a DB cursor session.

Threading those queries can speed up your process, but yields another set of problems like DB overload and max sessions allowed.

First approach: Load many item IDs into every query

Instead, try to improve your queries. Can your write a query that sends a long list of products and returns the matches? Perhaps something like:

SELECT item_id, * 
FROM   items
WHERE  item_id IN (id1, id2, id3, id4, id5, ....)

Python gives you convenient interfaces for this kind if queries, so that the IN clause can use a pythonic list. This way you can break your long list of items to, say, 60 queries with 1,000 ids each.

Second approach: Use a temporary table

Another interesting approach is creating a temporary table on the database with your item ids. Temporary tables lasts as long as the connection lives, so you won't have to worry about cleanups. Perhaps something like:

CREATE TEMPORARY TABLE 
           item_ids_list (id INT PRIMARY KEY); # Remember indexing!

Insert the ids using an appropriate Python library:

INSERT INTO item_ids_list   ...                # Insert your 60,000 items here

Get your results:

SELECT * FROM items WHERE items.id IN (SELECT * FROM items_ids_list);
Community
  • 1
  • 1
Adam Matan
  • 128,757
  • 147
  • 397
  • 562
3

First of all you join only the last thread. There is no guarantee that it will be finished the last. You should use like that:

from time import sleep
delay = 0.5
tlist = [threading.Thread(target=search_sl, args=(ra,dec, q)) for ra, dec in candidates ]
map(lambda t:t.start(), tlist)
while(any(map(lambda t:t.isAlive()))): sleep(delay)

The second issue is the running 60K threads at the moment requires really huge hardware resource :-) It's better to queue your tasks and then process by workers. The number of worker threads must be limited. Like that (haven't tested the code, but the idea is clear I hope):

from Queue import Queue
from threading import Thread
from time import sleep
tasks = Queue()
map(tasks.put, candidates)
maxthreads = 50
delay = 0.1
try:
    threads = [Thread(target=search_sl, args=tasks.get()) \
               for i in xrange(0,maxthreads) ]
except Queue.Empty:
    pass
map(lambda t:t.start(), threads)

while not tasks.empty():
    threads = filter(lambda t:t.isAlive(), threads)
    while len(threads) < maxthreads:
        try:
            t = Thread(target=search_sl, args=tasks.get())
            t.start()
            threads.append(t)
        except Queue.Empty:
            break
    sleep(delay)

while(any(map(lambda t:t.isAlive(), threads))): sleep(delay)
user739807
  • 855
  • 4
  • 9
  • 14
Maksym Polshcha
  • 18,030
  • 8
  • 52
  • 77
  • been trying to follow your example. unfortunately, its my 1st time to work with threads, 'tlist' is not defined. is it supposed to be tasks/threads? tried both, still get errors with iterables – user739807 Apr 08 '12 at 14:38
  • @user739807 I've just fixed some errors. I have no chance to run the code and test it now. – Maksym Polshcha Apr 08 '12 at 14:42
  • @user739807 I didn't take into account 'q' parameter --- deal with this by yourself. – Maksym Polshcha Apr 08 '12 at 14:44
0

Since it's an IO task, neither of thread or process is good for it. You use those if you need to parallelize computational tasks. So, be modern please ™, use something like gevent for parallel IO intensive tasks.

http://www.gevent.org/intro.html#example

skrat
  • 5,518
  • 3
  • 32
  • 48