Pardon my ignorance as I am learning how I can use celery for my purposes.
Suppose I have two tasks: create_ticket
and add_message_to_ticket
. Usually create_ticket
task is created and completed before add_message_to_ticket
tasks are created multiple times.
@app.task
def create_ticket(ticket_id):
time.sleep(random.uniform(1.0, 4.0)) # replace with code that processes ticket creation
return f"Successfully processed ticket creation: {ticket_id}"
@app.task
def add_message_to_ticket(ticket_id, who, when, message_contents):
# TODO add code that checks to see if create_ticket task for ticket_id has already been completed
time.sleep(random.uniform(1.0, 4.0)) # replace with code that handles added message
return f"Successfully processed message for ticket {ticket_id} by {who} at {when}"
Now suppose that these tasks are created out of order due to Python's server receiving the events from an external web service out of order. For example, one add_message_to_ticket.delay(82, "auroranil", 1599039427, "This issue also occurs on Microsoft Edge on Windows 10.")
gets called few seconds before create_ticket.delay(82)
gets called. How would I solve the following problems?
- How would I fetch results of celery task
create_ticket
by specifying ticket_id within taskadd_message_to_ticket
? All I can think of is to maintain a database that stores tickets state, and checks to see if a particular ticket has been created, but I want to know if I am able to use celery's result backend somehow. - If I receive an
add_message_to_ticket
task with a ticket id where I find out that corresponding ticket does not havecreate_ticket
task completed, do I reject that task, and put that back in the queue? - Do I need to ensure that the tasks are idempotent? I know that is good practice, but is it a requirement for this to work?
- Is there a better approach at solving this problem? I am aware of Celery Canvas workflow with primitives such as chain, but I am not sure how I can ensure that these events are processed in order, or be able to put tasks on pending state while it waits for tasks it depends on to be completed based on arguments I want celery to check, which in this case is
ticket_id
.
I am not particularly worried if I receive multiple user messages for a particular ticket with timestamps out of order, as it is not as important as knowing that a ticket has been created before messages are added to that ticket. The point I am making is that I am coding up several tasks where some events crucially depend on others, whereas the ordering of other events do not matter as much for the Python's server to function.
Edit:
Partial solutions:
- Use
task_id
to identify Celery tasks, with a formatted string containing argument values which identifies that task. For example,task_id="create_ticket(\"TICKET000001\")"
- Retry tasks that do not meet dependency requirements. Blocking for subtasks to be completed is bad, as subtask may never complete, and will hog a process in one of the worker machines.
- Store arguments as part of result of a completed task, so that you can use that information not available in later tasks.
Relevant links:
- Where do you set the task_id of a celery task?
- Retrieve result from 'task_id' in Celery from unknown task
- Find out whether celery task exists
More questions:
- How do I ensure that I send task once per
task_id?
For instance, I wantcreate_ticket
task to be applied asynchronous only once. This is an alternative to making all tasks idempotent. - How do I use
AsyncResult
inadd_message_to_ticket
to check for status ofcreate_ticket
task? Is it possible to specify a chain somehow even though the first task may have already been completed? - How do I fetch all results of tasks given task name derived from the name of the function definition?
- Most importantly, should I use Celery results backend to abstract stored data away from dealing with a database? Or should I scratch this idea and just go ahead with designing a database schema instead?