3

I am currently working on a flask application (Python 3.6) and wanted to integrate celery because I have multiple long running background tasks.

Edit: Celery 4.1

The integration was no problem and the celery tasks are executed correctly, but I cannot access the current state of a running task.

Celery, Flask setup:

def make_celery(app):
    celery = Celery(app.import_name,
                    backend=app.config["result_backend"],
                    broker=app.config["broker_url"])

    celery.conf.update(app.config)

    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery


app = Flask(__name__)
app.config["broker_url"] = "redis://localhost:6379"
app.config["result_backend"] = "redis://localhost:6379"
app.config["DB"] = "../pyhodl.sqlite"

celery_app = make_celery(app)

Celery task:

@celery_app.task(bind=True, name="server.tasks.update_trading_pair")
def update_trading_pair(self, exchange, currency_a, currency_b):
    print(exchange, currency_a, currency_b)
    time.sleep(50)

Call the task and store the value in a dictionary:

task_id = update_trading_pair.delay(exchange, currency_a, currency_b)
print("NEW TASK")
print(task_id)
id = exchange_mnemonic + "_" + currency_a + "_" + currency_b
TASK_STATES[id] = task_id

Get state of a task:

result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])
print(result.state)
print(result) # works but only prints the task_id

This is were the error is raised. When i print only the result object it simply prints the task_id. And if I try to retrieve the current state i raises following exception:

TypeError: sequence item 1: expected a bytes-like object, AsyncResult found
Martin Pichler
  • 185
  • 2
  • 10
  • what about `print(result.status)` – akhilsp Mar 06 '18 at 09:26
  • raises the same error – Martin Pichler Mar 06 '18 at 09:29
  • Have a look into this question https://stackoverflow.com/questions/9034091/how-to-check-task-status-in-celery Hope it helps – akhilsp Mar 06 '18 at 09:41
  • I allready read that question. My problem is that I can't even access the properties of the AsyncResult because the error is raised. I can't even call result.ready(), result.get() or any other Method. Although when I check for the result type with type(result) it say it is a "celery.result.AsyncResult". The error says the same AsyncResult found therefore the exception doesn't even make sense. – Martin Pichler Mar 06 '18 at 09:45

1 Answers1

3

EXPLANATION :

When you call your task :

task_id = update_trading_pair.delay(exchange, currency_a, currency_b)

your variable task_id is an instance of AsyncResult, it is not a string.

So, your variable TASK_STATES[market.__id__()] is also an instance of AsyncResult whereas it should be a string.

And then your are trying to instantiate an AsyncResult object with it

result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])

So you are instantiating an AsyncResult object with another AsyncResult object whereas it should be instantiated with a string.

Maybe your confusion comes from your print(task_id) which shows you a string, but when you do this, under the hood the __str__ method of the AsyncResult object is called and if you look at it in the source code here,

def __str__(self):
    """`str(self) -> self.id`."""
    return str(self.id)

it justs print the id attribute of your task_id object.

SOLUTION :

You can fix it with either by doing TASK_STATES[id] = task_id.id or by doing

result = update_trading_pair.AsyncResult(str(TASK_STATES[market.__id__()]))
arthur
  • 2,319
  • 1
  • 17
  • 24
  • Thank you! That was the exact problem. I thought that somewhere I read that delay() returns only the id. A look into the documentation would have solved the problem. http://docs.celeryproject.org/en/latest/reference/celery.app.task.html – Martin Pichler Mar 06 '18 at 11:29