1

My django app allows users to send messages to each other, and I pool some of the recent messages together and send them in an email using celery and redis.

Every time a user sends a message, I add a Message to the db and then trigger an async task to pool that user's messages from the last 60 seconds and send them as an email.

tasks.pushMessagePool.apply_async(args = (fromUser,), countdown = 60)

If the user sends 5 messages in the next 60 seconds, then my assumption is that 5 tasks should be created, but only the first task sends the email, and the other 4 tasks do nothing. I implemented a simple locking mechanism to make sure that messages were only considered a single time and to ensure db locking.

@shared_task
def pushMessagePool(fromUser, ignore_result=True):
    lockCode = randint(0,10**9)
    data.models.Messages.objects.filter(fromUser = fromUser, locked=False).update(locked=True, lockCode = lockCode)
    M = data.models.Messages.objects.filter(fromUser = fromUser, lockCode = lockCode)
    sendEmail(M,lockCode)

With this setup, I still get occasional (~10%) duplicates. The duplicates will fire within 10ms of each other, and they have different lockCodes.

Why doesn't this locking mechanism work? Does celery refer to an old DB snapshot? That wouldn't make any sense.

djangojack
  • 43
  • 5

1 Answers1

0

Djangojack,here is a similar issue? But for SQS. I'm not sure if it applies to Redis too?

When creating your SQS queue you need to set the Default Visibility timeout to some time that's greater than the max time you expect a task to run. This is the time SQS will make a message invisible to all other consumers after delivering to one consumer. I believe the default is 30 seconds. So, if a task takes more than 30 seconds, SQS will deliver the same message to another consumer because it assumes the first consumer died and did not complete the task.

From a comment by @gustavo-ambrozio on this answer.

Community
  • 1
  • 1
Greg
  • 45,306
  • 89
  • 231
  • 297
  • The default visibility timeout for Redis is 1 hour, and the task definitely shorter than an hour. But even if it died towards the end of the task, why wouldnt setting locked=True prevent future tasks for processing the same records? – djangojack Mar 10 '16 at 20:22