I'm trying to add some arguments from the command line to a Celery task. The best answer is this really old one: Initializing a worker with arguments using Celery, but it seems to not be valid for celery 5.3
The use case is extremely simple. I want to run my celery workers using a specific database defined when launching the worker (with CLI) - to instantiate a base class for the task and creating the connection with the DB just one time.
Link to the tasks: https://github.com/Giovani-Merlin/wbdsm/blob/main/celery/links/extract_links_task.py Link to the worker: https://github.com/Giovani-Merlin/wbdsm/blob/main/celery/links/extract_links_worker.py
Results up to now:
I can successfully add user options and run the bootstrap BUT the task is initialized BEFORE the bootsteps. The order that I can see (debug mode and prints) is
- App initialization
- IndexLinks initialization ( No access to the custom arguments )
- CustomArgs boot step ran ( Access to the arguments )
If the CustomArgs bootstrap was run BEFORe IndexLinks initialization I could just inject parameters to the class but I couldn't do this order.
Adding requires* = {"celery.worker.consumer.tasks:Tasks"} to my bootstrap I can successfully access the created task, I've created a method "after_bootstep_init" to be run in the boot_step ran method but it seems to be useless, the already created tasks don't get affected by the changes at this moment.
*Not using any "requires" the bootstep is run after the IndexLinks initialization also
From the old answer (2014) I was supposed to be able to add the arguments language and mongo_uri in the @task(args) args and instantiate the base class using it. If I add any argument to the init of the IndexLinks task I get the error "no value for args ..."
The documentation shows off only to do bootstrap in the worker (for retry, change queues) and receive arguments for it. I can't find how I can change the tasks' default variables...
Relevant code snippets:
# https://docs.celeryq.dev/en/latest/userguide/tasks.html#instantiation
class IndexLinks(Task):
abstract = True
mongo_uri = "mongodb://localhost:27017/"
language = "de"
def __init__(self) -> None:
super().__init__()
# just for DEBUG
self.init_after_bootsteps(IndexLinks.language, IndexLinks.mongo_uri)
def init_after_bootsteps(self, language, mongo_uri) -> None:
# def before_start(self, task_id, args, kwargs):
# super().before_start(task_id, args, kwargs)
self.language = language
self.mongo_uri = mongo_uri
print(self.language)
print(self.mongo_uri)
db_name = self.language + "wiki"
self.db_name = db_name
client = MongoClient(self.mongo_uri)
self.links_collection = client[db_name]["links"]
self.pages_collection = client[db_name]["pages"]
self.links_collection.create_index([("links_to", pymongo.HASHED)])
self.links_collection.create_index([("source_doc", pymongo.HASHED)])
self.links_collection.create_index([("text", pymongo.HASHED)])
print("loaded")
# https://stackoverflow.com/questions/27070485/initializing-a-worker-with-arguments-using-celery
# Make bootstep to add custom arguments
class CustomArgs(bootsteps.StartStopStep):
requires = {"celery.worker.consumer.tasks:Tasks"}
def __init__(self, parent, mongo_uri="blabla", language="de", **options):
super().__init__(parent, **options)
print("{0!r} is in init".format(parent))
print("Storing language and data_path")
print("Language: ", language)
print("Mongo URI: ", mongo_uri)
self.language = language
self.mongo_uri = mongo_uri
def create(self, parent):
return super().create(parent)
def start(self, parent):
# our step is started together with all other Worker/Consumer
# bootsteps.
print("{0!r} is starting".format(parent))
parent.app.tasks["index_links_task"].init_after_bootsteps(self.language, self.mongo_uri)
def stop(self, parent):
# the Consumer calls stop every time the consumer is
# restarted (i.e., connection is lost) and also at shutdown.
# The Worker will call stop at shutdown only.
print("{0!r} is stopping".format(parent))
def shutdown(self, parent):
# shutdown is called by the Consumer at shutdown, it's not
# called by Worker.
print("{0!r} is shutting down".format(parent))
# app.steps["worker"].add(CustomArgs)
app.steps["consumer"].add(CustomArgs)
# Could parse the article in different ways, like getting the text per paragraph, ignoring lists, depends on the objective. To match Zeshel's and mewsli's (uses wikiextractor) we will just append all the texts.
@app.task(
name="extract_links_task",
bind=True,
queue="links_to_extract",
base=IndexLinks,
language=None,
mongo_uri=None,
)
def extract_links_task(self, skip: str, limit: int, min_query_size: int = 50):
"""
Celery task to extract links from wikipedia articles.
The skip and limit parameters are used to paginate the query to the database.
These parameters are controlled by the app in the extract_links_app.py
Args:
skip (str): Skip articles with id lower than this
limit (int): Number of articles to parse
min_query_size (int, optional): Min size of the query (in chars) to be considered. Defaults to 50. Values lower than this will be ignored.
"""
# First
logger.info("Getting articles from pages collection")
# Get pages from pages collection
pages = list(
self.pages_collection.find({"isRedirect": False, "pageID": {"$gt": skip}}).sort("pageID", 1).limit(limit)
)
# Transform in dataclasses to make it easier to work with and encapsulate parsing/cleaning logic
pages_obj = [Page.from_mongo(page, self.language) for page in pages]
links = extract_links(pages_obj, pages_collection=self.pages_collection, min_query_size=min_query_size)
return links
Args creation (working well):
import os
from datetime import timedelta
from celery import Celery, Task, bootsteps
from click import Option
app = Celery(
"Extract mentions project",
)
default_config = "celeryconfig"
app.config_from_object(default_config)
# Add options to worker
app.user_options["worker"].add(
Option(
("--mongo_uri",),
default=os.environ.get("MONGO_URI", "mongodb://localhost:27017"),
help="MongoDB URI",
)
)
app.user_options["worker"].add(
Option(
("--language",),
default=os.environ.get("LANGUAGE", "en"),
help="Language to use",
)
)