0

I am trying to implement an MLOps app with building up a celery worker in docker containers, and trying to send task message from local environment. There are three containers:

  • multi_label_text_classification_celery_server_1
    • for training and inference
  • multi_label_text_classification_redis_1
  • multi_label_text_classification_triton_server_1

I have tested the inference task, and it can be called remotely with below:

    dataset = pd.read_json('data.json')
    data = dataset.text.values.tolist()
    j_string = json.dumps(data, ensure_ascii=False)

    predict_task = predict.apply_async(
        args=(
            'audience_bert',
            1,
            100,
            j_string
        )
    )
    print(predict_task)

task_id: 758d7455-af2d-494e-8ba9-f9e502a8727c

enter image description here

Yet when I have tried to check the status and get the result:

result = AsyncResult(task_id)
print(result.state)
print(result.get())

it shows up the error DisabledBackend but I have already set up both backends in container and local celery app:

Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\result.py", line 478, in state
    return self._get_task_meta()['status']
  File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\result.py", line 417, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\backends\base.py", line 609, in get_task_meta
    meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'

I have looking for the probable reason that caused the DisabledBackend, while lots of the similar posts are about missing backend.
Please, could anyone help me out?

Below are my docker-compose file and celery code:

  • docker-compose.yml
version: "3"

services:
  celery_server:
    env_file: .env
    build:
      context: .
      dockerfile: Dockerfile
    volumes:
      - models:/model/torch_script
    environment:
      LEVEL: ${LEVEL}
    links:
      - redis
    depends_on:
      - redis

  redis:
    image: redis:latest
    hostname: redis
    ports:
      - 6379:6379

  triton_server:
    image: nvcr.io/nvidia/tritonserver:22.06-py3
    hostname: triton
    ports:
      - 8000:8000
      - 8001:8001
      - 8002:8002
    command: ["tritonserver", "--model-store=/models", "--model-control-mode=poll", --repository-poll-secs=30]
    volumes:
      - models:/models
    shm_size: 1g
    ulimits:
      memlock: -1
      stack: 67108864

volumes:
  models:

  • container worker
import json
from typing import Dict

from celery import Celery

from config.settings import MODEL_CKPT, LogDir
from utils.inference_helper import chunks
from utils.log_helper import create_logger
from worker.inference.bert_triton_inference import BertInferenceWorker
from worker.train.chinese_bert_classification import ChineseBertClassification

app = Celery(
    name='bert_celery',
    broker="redis://redis:6379/0",
    backend="redis://redis:6379/1"
)

app.conf.task_routes = {
    'app.*': {'queue': 'deep_model'},
}

app.conf.update(result_expires=1)
app.conf.update(task_track_started=True)


@app.task(bind=True, queue='deep_model', name='training')
def training(
        self,
        model_name,
        version,
        dataset,
        label_col,
        learning_rate=2e-5,
        epochs=50,
        batch_size=32,
        max_len=30,
        is_multi_label=1,
        ckpt=MODEL_CKPT.get('chinese-bert-wwm')

):
    dataset = json.loads(dataset)
    label_col = json.loads(label_col)

    task_worker = ChineseBertClassification(
        max_len=max_len,
        ckpt=ckpt,
        epochs=epochs,
        learning_rate=learning_rate,
        batch_size=batch_size,
        dataset=dataset,
        label_col=label_col,
        model_name=model_name,
        model_version=version,
        is_multi_label=is_multi_label
    )

    task_worker.init_model()
    results: Dict[str, str] = task_worker.run()
    return results


@app.task(bind=True, queue='deep_model', name='predict')
def predict(self, model_name, version, max_len, dataset):
    logger = create_logger(LogDir.inference)
    data = json.loads(dataset)

    output = []
    for idx, chunk in enumerate(chunks(data, 32)):
        logger.info(f" ==== batch: {idx} ==== ")
        infer_worker = BertInferenceWorker(
            dataset=chunk,
            model_name=model_name,
            model_version=version,
            url='triton:8000',
            backend='pytorch',
            max_len=max_len,
            chunk_size=len(chunk)
        )
        results = infer_worker.run()
        # print(results)
        output.extend(results.tolist())

    assert len(output) == len(data)

    return json.dumps(output, ensure_ascii=False)

  • localhost celery
from celery import Celery


app = Celery(
    name='bert_celery',
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1"
)

app.conf.task_routes = {
    'app.*': {'queue': 'deep_model'},
}


@app.task(bind=True, queue='deep_model', name='training')
def training(
        self,
        model_name,
        version,
        dataset,
        label_col,
        learning_rate,
        epochs,
        batch_size,
        max_len,
        is_multi_label
):
    pass


@app.task(bind=True, queue='deep_model', name='predict')
def predict(
        self,
        model_name,
        version,
        max_len,
        dataset
):
    pass

Weber Huang
  • 233
  • 2
  • 11
  • https://stackoverflow.com/questions/30753040/retrieve-task-result-by-id-in-celery – Bdr May 23 '23 at 19:19

1 Answers1

0

I figured out the solution of this problem.
First, I thought it might be the error from Redis, so I checked the docker log and it showed:

WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.

So I re-configured the overcommit_memory yet the problem still wasn't solved.

Then, I tried to add the related task which I wanted to retrieve the task info as well as the results by from app import predict in the same script with AsyncResult and IT WORKED!

from celery.result import AsyncResult 
from app import predict # this is essential

results = AsyncResult(<task_id>)
print(results.state) # if no task imported this may return `DisabledBackend`
print(results.get()) # if no task imported this may return `DisabledBackend`

It seems that when celery trying to get the result information, you have to import the related task too to let it knows the backend setting. It is kinda a dumb question tho...

Weber Huang
  • 233
  • 2
  • 11