I am trying to implement an MLOps app with building up a celery worker in docker containers, and trying to send task message from local environment. There are three containers:
- multi_label_text_classification_celery_server_1
- for training and inference
- multi_label_text_classification_redis_1
- multi_label_text_classification_triton_server_1
I have tested the inference task, and it can be called remotely with below:
dataset = pd.read_json('data.json')
data = dataset.text.values.tolist()
j_string = json.dumps(data, ensure_ascii=False)
predict_task = predict.apply_async(
args=(
'audience_bert',
1,
100,
j_string
)
)
print(predict_task)
task_id
: 758d7455-af2d-494e-8ba9-f9e502a8727c
Yet when I have tried to check the status and get the result:
result = AsyncResult(task_id)
print(result.state)
print(result.get())
it shows up the error DisabledBackend
but I have already set up both backends in container and local celery app:
Traceback (most recent call last):
File "<input>", line 1, in <module>
File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\result.py", line 478, in state
return self._get_task_meta()['status']
File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\result.py", line 417, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "C:\Users\ychuang\AppData\Local\pypoetry\Cache\virtualenvs\celery-client-HqyYFMWr-py3.8\lib\site-packages\celery\backends\base.py", line 609, in get_task_meta
meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
I have looking for the probable reason that caused the DisabledBackend
, while lots of the similar posts are about missing backend
.
Please, could anyone help me out?
Below are my docker-compose file and celery code:
- docker-compose.yml
version: "3"
services:
celery_server:
env_file: .env
build:
context: .
dockerfile: Dockerfile
volumes:
- models:/model/torch_script
environment:
LEVEL: ${LEVEL}
links:
- redis
depends_on:
- redis
redis:
image: redis:latest
hostname: redis
ports:
- 6379:6379
triton_server:
image: nvcr.io/nvidia/tritonserver:22.06-py3
hostname: triton
ports:
- 8000:8000
- 8001:8001
- 8002:8002
command: ["tritonserver", "--model-store=/models", "--model-control-mode=poll", --repository-poll-secs=30]
volumes:
- models:/models
shm_size: 1g
ulimits:
memlock: -1
stack: 67108864
volumes:
models:
- container worker
import json
from typing import Dict
from celery import Celery
from config.settings import MODEL_CKPT, LogDir
from utils.inference_helper import chunks
from utils.log_helper import create_logger
from worker.inference.bert_triton_inference import BertInferenceWorker
from worker.train.chinese_bert_classification import ChineseBertClassification
app = Celery(
name='bert_celery',
broker="redis://redis:6379/0",
backend="redis://redis:6379/1"
)
app.conf.task_routes = {
'app.*': {'queue': 'deep_model'},
}
app.conf.update(result_expires=1)
app.conf.update(task_track_started=True)
@app.task(bind=True, queue='deep_model', name='training')
def training(
self,
model_name,
version,
dataset,
label_col,
learning_rate=2e-5,
epochs=50,
batch_size=32,
max_len=30,
is_multi_label=1,
ckpt=MODEL_CKPT.get('chinese-bert-wwm')
):
dataset = json.loads(dataset)
label_col = json.loads(label_col)
task_worker = ChineseBertClassification(
max_len=max_len,
ckpt=ckpt,
epochs=epochs,
learning_rate=learning_rate,
batch_size=batch_size,
dataset=dataset,
label_col=label_col,
model_name=model_name,
model_version=version,
is_multi_label=is_multi_label
)
task_worker.init_model()
results: Dict[str, str] = task_worker.run()
return results
@app.task(bind=True, queue='deep_model', name='predict')
def predict(self, model_name, version, max_len, dataset):
logger = create_logger(LogDir.inference)
data = json.loads(dataset)
output = []
for idx, chunk in enumerate(chunks(data, 32)):
logger.info(f" ==== batch: {idx} ==== ")
infer_worker = BertInferenceWorker(
dataset=chunk,
model_name=model_name,
model_version=version,
url='triton:8000',
backend='pytorch',
max_len=max_len,
chunk_size=len(chunk)
)
results = infer_worker.run()
# print(results)
output.extend(results.tolist())
assert len(output) == len(data)
return json.dumps(output, ensure_ascii=False)
- localhost celery
from celery import Celery
app = Celery(
name='bert_celery',
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1"
)
app.conf.task_routes = {
'app.*': {'queue': 'deep_model'},
}
@app.task(bind=True, queue='deep_model', name='training')
def training(
self,
model_name,
version,
dataset,
label_col,
learning_rate,
epochs,
batch_size,
max_len,
is_multi_label
):
pass
@app.task(bind=True, queue='deep_model', name='predict')
def predict(
self,
model_name,
version,
max_len,
dataset
):
pass