7

I have a Django app saving objects to the database and a celery task that periodically does some processing on some of those objects. The problem is that the user can delete an object after it has been selected by the celery task for processing, but before the celery task has actually finished processing and saving it. So when the celery task does call .save(), the object re-appears in the database even though the user deleted it. Which is really spooky for users, of course.

So here's some code showing the problem:

def my_delete_view(request, pk):
    thing = Thing.objects.get(pk=pk)
    thing.delete()
    return HttpResponseRedirect('yay')

@app.task
def my_periodic_task():
    things = get_things_for_processing()
    # if the delete happens anywhere between here and the .save(), we're hosed
    for thing in things:
        process_thing(thing) # could take a LONG time
        thing.save()

I thought about trying to fix it by adding an atomic block and a transaction to test if the object actually exists before saving it:

@app.task
def my_periodic_task():
    things = Thing.objects.filter(...some criteria...)
    for thing in things:
        process_thing(thing) # could take a LONG time
        try:
            with transaction.atomic():
                # just see if it still exists:
                unused = Thing.objects.select_for_update().get(pk=thing.pk)
                # no exception means it exists. go ahead and save the
                # processed version that has all of our updates.
                thing.save()
         except Thing.DoesNotExist:
             logger.warning("Processed thing vanished")

Is this the correct pattern to do this sort of thing? I mean, I'll find out if it works within a few days of running it in production, but it would be nice to know if there are any other well-accepted patterns for accomplishing this sort of thing.

What I really want is to be able to update an object if it still exists in the database. I'm ok with the race between user edits and edits from the process_thing, and I can always throw in a refresh_from_db just before the process_thing to minimize the time during which user edits would be lost. But I definitely can't have objects re-appearing after the user has deleted them.

mgalgs
  • 15,671
  • 11
  • 61
  • 74

2 Answers2

0

if you open a transaction for the time of processing of celery task, you should avoid such a problems:

@app.task
@transaction.atomic
def my_periodic_task():
    things = get_things_for_processing()
    # if the delete happens anywhere between here and the .save(), we're hosed
    for thing in things:
        process_thing(thing) # could take a LONG time
        thing.save()

sometimes, you would like to report to the frontend, that you are working on the data, so you can add select_for_update() to your queryset (most probably in get_things_for_processing), then in the code responsible for deletion you need to handle errors when db will report that specific record is locked.

Jerzyk
  • 3,662
  • 23
  • 40
  • Hmm, I'd rather leave it locked for the shortest amount of time possible (as shown in my question). Is there any advantage to leaving it locked the whole time over the approach I proposed? – mgalgs Jun 16 '16 at 04:01
  • yes - if you are using data that is stored within this record and you *must* be sure that from the moment you start processing till end - nothing and nobody will change this record – Jerzyk Jun 16 '16 at 04:26
  • But that's not what I need :). As stated in the question, I'm ok with the race between user edits and edits from the `process_thing`. I just don't want a deleted object re-appearing. That's the only thing I'm trying to protect against. – mgalgs Jun 16 '16 at 05:32
0

For now, it seems like the pattern of "select again atomically, then save" is sufficient:

@app.task
def my_periodic_task():
    things = Thing.objects.filter(...some criteria...)
    for thing in things:
        process_thing(thing) # could take a LONG time
        try:
            with transaction.atomic():
                # just see if it still exists:
                unused = Thing.objects.select_for_update().get(pk=thing.pk)
                # no exception means it exists. go ahead and save the
                # processed version that has all of our updates.
                thing.save()
         except Thing.DoesNotExist:
             logger.warning("Processed thing vanished")

(this is the same code as in my original question).

mgalgs
  • 15,671
  • 11
  • 61
  • 74