I am working in a Django web application which needs to query a PostgreSQL database. When implementing concurrency using Python threading interface, I am getting DoesNotExist
errors for the queried items. Of course, these errors do not occur when performing the queries sequentially.
Let me show a unit test which I wrote to demonstrate the unexpected behavior:
class ThreadingTest(TestCase):
fixtures = ['demo_city',]
def test_sequential_requests(self):
"""
A very simple request to database, made sequentially.
A fixture for the cities has been loaded above. It is supposed to be
six cities in the testing database now. We will made a request for
each one of the cities sequentially.
"""
for number in range(1, 7):
c = City.objects.get(pk=number)
self.assertEqual(c.pk, number)
def test_threaded_requests(self):
"""
Now, to test the threaded behavior, we will spawn a thread for
retrieving each city from the database.
"""
threads = []
cities = []
def do_requests(number):
cities.append(City.objects.get(pk=number))
[threads.append(threading.Thread(target=do_requests, args=(n,))) for n in range(1, 7)]
[t.start() for t in threads]
[t.join() for t in threads]
self.assertNotEqual(cities, [])
As you can see, the first test performs some database requests sequentially, which are indeed working with no problem. The second test, however, performs exactly the same requests but each request is spawned in a thread. This is actually failing, returning a DoesNotExist
exception.
The output of the execution of this unit tests is like this:
test_sequential_requests (cesta.core.tests.threadbase.ThreadingTest) ... ok
test_threaded_requests (cesta.core.tests.threadbase.ThreadingTest) ...
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 45, in do_requests
cities.append(City.objects.get(pk=number))
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/manager.py", line 132, in get
return self.get_query_set().get(*args, **kwargs)
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/query.py", line 349, in get
% self.model._meta.object_name)
DoesNotExist: City matching query does not exist.
... other threads returns a similar output ...
Exception in thread Thread-6:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 45, in do_requests
cities.append(City.objects.get(pk=number))
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/manager.py", line 132, in get
return self.get_query_set().get(*args, **kwargs)
File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/query.py", line 349, in get
% self.model._meta.object_name)
DoesNotExist: City matching query does not exist.
FAIL
======================================================================
FAIL: test_threaded_requests (cesta.core.tests.threadbase.ThreadingTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 52, in test_threaded_requests
self.assertNotEqual(cities, [])
AssertionError: [] == []
----------------------------------------------------------------------
Ran 2 tests in 0.278s
FAILED (failures=1)
Destroying test database for alias 'default' ('test_cesta')...
Remember that all this is happening in a PostgreSQL database, which is supposed to be thread safe, not with the SQLite or similars. Test was ran using PostgreSQL also.
At this point, I am totally lost about what can be failing. Any idea or suggestion?
Thanks!
EDIT: I wrote a little view just to check up if it works out of the tests. Here is the code of the view:
def get_cities(request):
queue = Queue.Queue()
def get_async_cities(q, n):
city = City.objects.get(pk=n)
q.put(city)
threads = [threading.Thread(target=get_async_cities, args=(queue, number)) for number in range(1, 5)]
[t.start() for t in threads]
[t.join() for t in threads]
cities = list()
while not queue.empty():
cities.append(queue.get())
return render_to_response('async/cities.html', {'cities': cities},
context_instance=RequestContext(request))
(Please, do not take into account the folly of writing the application logic inside the view code. Remember that this is only a proof of concept and would not be never in the real app.)
The result is that code is working nice, the requests are made successfully in threads and the view finally shows the cities after calling its URL.
So, I think making queries using threads will only be a problem when you need to test the code. In production, it will work without any problem.
Any useful suggestions to test this kind of code successfully?