I have a django application that is backed by a MySQL database. I have recently moved a section of code out of the request flow and put it into a Process. The code uses select_for_update()
to lock affected rows in the DB but now I am occasionally seeing the Process updating a record while it should be locked in the main Thread. If I switch my Executor from a ProcessPoolExecutor
to a ThreadPoolExecutor
the locking works as expected. I thought that select_for_update()
operated at the database level so it shouldn't make any difference whether code is in Threads, Processes, or even on another machine - what am I missing?
I've boiled my code down to a sample that exhibits the same behaviour:
from concurrent import futures
import logging
from time import sleep
from django.db import transaction
from myapp.main.models import CompoundBase
logger = logging.getLogger()
executor = futures.ProcessPoolExecutor()
# executor = futures.ThreadPoolExecutor()
def test() -> None:
pk = setup()
f1 = executor.submit(select_and_sleep, pk)
f2 = executor.submit(sleep_and_update, pk)
futures.wait([f1, f2])
def setup() -> int:
cb = CompoundBase.objects.first()
cb.corporate_id = 'foo'
cb.save()
return cb.pk
def select_and_sleep(pk: int) -> None:
try:
with transaction.atomic():
cb = CompoundBase.objects.select_for_update().get(pk=pk)
print('Locking')
sleep(5)
cb.corporate_id = 'baz'
cb.save()
print('Updated after sleep')
except Exception:
logger.exception('select_and_sleep')
def sleep_and_update(pk: int) -> None:
try:
sleep(2)
print('Updating')
with transaction.atomic():
cb = CompoundBase.objects.select_for_update().get(pk=pk)
cb.corporate_id = 'bar'
cb.save()
print('Updated without sleep')
except Exception:
logger.exception('sleep_and_update')
test()
When run as shown I get:
Locking
Updating
Updated without sleep
Updated after sleep
But if I change to the ThreadPoolExecutor
I get:
Locking
Updating
Updated after sleep
Updated without sleep