I am currently trying to setup celery to handle responses from a chatbot and forward those responses to a user.
The chatbot hits the /response endpoint of my server, that triggers the following function in my server.py module:
def handle_response(user_id, message):
"""Endpoint to handle the response from the chatbot."""
tasks.send_message_to_user.apply_async(args=[user_id, message])
return ('OK', 200,
{'Content-Type': 'application/json; charset=utf-8'})
In my tasks.py file, I import celery and create the send_message_to_user function:
from celery import Celery
celery_app = Celery('tasks', broker='redis://')
@celery_app.task(name='send_message_to_user')
def send_message_to_user(user_id, message):
"""Send the message to a user."""
# Here is the logic to send the message to a specific user
My problem is, my chatbot may answer multiple messages to a user, so the send_message_to_user task is properly put in the queue but then a race condition arises and sometimes the messages arrive to the user in the wrong order.
How could I make each send_message_to_user task wait for the previous task with the same name and with the same argument "user_id" before executing it ?
I have looked at this thread Running "unique" tasks with celery but a lock isn't my solution, as I don't want to implement ugly retries when the lock is released.
Does anyone have any idea how to solve that issue in a clean(-ish) way ?
Also, it's my first post here so I'm open to any suggestions to improve my request.
Thanks!