5

I have a view that sends a message to a RabbitMQ queue.

message = {'origin': 'Bytes CSV',
           'data': {'csv_key': str(csv_entry.key),
                    'csv_fields': csv_fields
                    'order_by': order_by,
                    'filters': filters}}

...

queue_service.send(message=message, headers={}, exchange_name=EXCHANGE_IN_NAME,
                   routing_key=MESSAGES_ROUTING_KEY.replace('#', 'bytes_counting.create'))

On my consumer, I have a long process to generate a CSV.

def create(self, data):
    csv_obj = self._get_object(key=data['csv_key'])
    if csv_obj.status == CSVRequestStatus.CANCELED:
        self.logger.info(f'CSV {csv_obj.key} was canceled by the user')
        return

    result = self.generate_result_data(filters=data['filters'], order_by=data['order_by'], csv_obj=csv_obj)
    csv_data = self._generate_csv(result=result, csv_fields=data['csv_fields'], csv_obj=csv_obj)
    file_key = self._post_csv(csv_data=csv_data, csv_obj=csv_obj)

    csv_obj.status = CSVRequestStatus.READY
    csv_obj.status_additional = CSVRequestStatusAdditional.SUCCESS
    csv_obj.file_key = file_key
    csv_obj.ready_at = timezone.now()
    csv_obj.save(update_fields=['status', 'status_additional', 'ready_at', 'file_key'])

    self.logger.info(f'CSV {csv_obj.name} created')

The long proccess happens inside self._generate_csv, because self.generate_result_data returns a queryset, which is lazy.

As you can see, if a user changes the status of the csv_request through an endpoint BEFORE the message starts to be consumed the proccess will not be evaluated. My goal is to let this happen during the execution of self._generate_csv.

So far I tried to use Threading, but unsuccessfully.

How can I achive my goal?

Thanks a lot!

Murilo Sitonio
  • 270
  • 7
  • 30
  • "My goal is to let this happen during the execution of self._generate_csv." what do you mean by that? – Lord Elrond May 11 '21 at 18:46
  • @LordElrond I want to stop the execution of this function (even if it already started) if the `csv_obj` status changes to `CSVRequestStatus.CANCELED`. – Murilo Sitonio May 11 '21 at 23:22

2 Answers2

1

Why don't you checkout Celery library ? Using celery with django with RabbitMQ backend is much easier than directly leveraging rabbitmq queues.

Celery has an inbuilt function revoke to terminate an ongoing task:

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)

For your use case, you probably want something like (code snippets):

## celery/tasks.py
from celery import app

@app.task(queue="my_queue")
def create_csv(message):
    # ...snip...
    pass

## main.py
from celery import uuid, current_app

def start_task(task_id, message):
    current_app.send_task(
        "create_csv",
        args=[message],
        task_id=task_id,
    )

def kill_task(task_id):
    current_app.control.revoke(task_id, terminate=True)

## signals.py

from django.dispatch import receiver
from .models import MyModel
from .main import kill_task

# choose appropriate signal to listen for DB change
@receiver(models.signals.post_save, sender=MyModel)
def handler(sender, instance, **kwargs):
    kill_task(instance.task_id)
  • Use celery.uuid to generate task IDs which can be stored in DB or cache and use the same task ID to control the task i.e. request termination.
eshaan7
  • 968
  • 2
  • 9
  • 17
  • Although I agree celery would simplify things, I'm afraid that changing everything for just this function would be an overkill... – Murilo Sitonio May 11 '21 at 23:20
  • Note that **`celery.task`** is depreciated, and doesn't work at all in **`v5.x`**. – Lord Elrond May 15 '21 at 20:22
  • 1
    @MuriloSitonio originally I agreed with you, in that using celery for this task alone would be overkill. However, after trying to find a "pure" solution, I came to this conclusion: the maintenance overhead of implementing this yourself will **far** outweigh the overhead that celery requires, since it will likely takes hundreds of lines of code to implement (presumably using `pika`) . **TLDR;** I would recommend going with celery! – Lord Elrond May 15 '21 at 20:34
  • @LordElrond thanks to put some effort trying to find a "pure" solution! I'm still trying to find one that won't outweigh the overhead that celery requires, but I'm afraid I'll agree with you. Anyway, if you want to share your train of thought with this "pure" solution I'll thank you a lot! – Murilo Sitonio May 17 '21 at 23:59
0

Since self._generate_csv is the slowest, the obvious solution would be to work with this function.

To do this, you can divide the creation of the csv file into several pieces. After creating each piece, check the status and see if you can continue to create the file. At the very end, glue all the pieces into a finished file.

Here is a method for combining multiple files into one.

  • That's indeed a good idea, but `self._generate_csv` does not generate the csv only, it also evaluates the `queryset`. I could slice the `queryset` for each smaller csv that I'll eventually join, but than I'd hit the db multiple times and the data might change in between. Or maybe I just evaluate the whole `queryset` at once and I could slice it like a list. That might work indeed. I'm going to test it. Thanks! – Murilo Sitonio May 11 '21 at 13:53
  • You can generate a queryset once, hit the db only once, but then slice it or use an iterator. PostgreSQL, for example, uses server-side cursors to stream results from the database without loading the entire result set into memory. Reference: https://docs.djangoproject.com/en/2.2/ref/models/querysets/#iterator – Trevor Cox May 16 '21 at 05:47
  • @TrevorCox wow I've never heard of iterator! But I still have to hit the db N times for N slices because at each one of them I'll have to check if the status of the `csv_obj` has changed... – Murilo Sitonio May 18 '21 at 00:02