My django app allows users to send messages to each other, and I pool some of the recent messages together and send them in an email using celery and redis.
Every time a user sends a message, I add a Message to the db and then trigger an async task to pool that user's messages from the last 60 seconds and send them as an email.
tasks.pushMessagePool.apply_async(args = (fromUser,), countdown = 60)
If the user sends 5 messages in the next 60 seconds, then my assumption is that 5 tasks should be created, but only the first task sends the email, and the other 4 tasks do nothing. I implemented a simple locking mechanism to make sure that messages were only considered a single time and to ensure db locking.
@shared_task
def pushMessagePool(fromUser, ignore_result=True):
lockCode = randint(0,10**9)
data.models.Messages.objects.filter(fromUser = fromUser, locked=False).update(locked=True, lockCode = lockCode)
M = data.models.Messages.objects.filter(fromUser = fromUser, lockCode = lockCode)
sendEmail(M,lockCode)
With this setup, I still get occasional (~10%) duplicates. The duplicates will fire within 10ms of each other, and they have different lockCodes.
Why doesn't this locking mechanism work? Does celery refer to an old DB snapshot? That wouldn't make any sense.