I am unable to startup an arq worker in FastAPI even though i was able to create a redis pool and submit a task, but without an active worker, the task would not be executed.
I called the Worker class and the run method but got this response,
File "/usr/local/lib/python3.10/site-packages/arq/worker.py", line 281, in run
self.loop.run_until_complete(self.close())
File "uvloop/loop.pyx", line 1511, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1504, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1377, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 518, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.
I called ARQ worker class as follows:
w = Worker(
functions=[SMS.startSendingSms],
redis_settings=redis_conn,
on_startup = startup,
max_jobs=1000,
keep_result_forever=True,
job_timeout=86000,
max_tries=1000
)
w.run()
This was supposed to start the worker but threw up "event loop is already running" error.
Furthermore, i decided to run it outside of the main event loop, in a docker container.
Function to execute
class SMS:
async def startSendingSms(ctx, data, dataHeader, userInfo):
........
Worker Class to startup
@dataclass
class WorkerSettings:
async def worker():
redis_conn = await Redis.redis_settings()
w = Worker(
functions = [SMS.startSendingSms],
redis_pool=redis_conn,
# on_startup = startup,
max_jobs=1000,
keep_result_forever=True,
job_timeout=86000,
max_tries=1000
)
w.run()
Docker Command
command: arq app.services.worker.worker.WorkerSettings
Error Response
assert len(self.functions) > 0, 'at least one function or cron_job must be registered'
AssertionError: at least one function or cron_job must be registered
The function, startSendingSms is being registered and i am at a loss why this error is coming up.
Problem Resolved -- Solution
I ran ARQ worker in a docker container and the function in a FASTAPI class.
DOCKER To Start ARQ Worker
mxs-arq:
container_name: mxs-arq
build:
context: ./backend
volumes:
- ./backend:/appINFO
command: arq app.services.worker.worker.WorkerSettings
restart: always
depends_on:
- redis
Python Class for ARQ Worker
@dataclass
class WorkerSettings:
redis_pool = RedisSettings(
host=settings.REDIS_DB,
port=settings.REDIS_PORT,
password=None
)
w = Worker(
functions=[SendingSmsTest],
queue_name='messaging',
redis_settings = redis_pool,
max_jobs=1000,
keep_result_forever=True,
job_timeout=86000,
max_tries=1000
)
w.run()
Steps to Follow
- Start the docker container.
- Run the create_pool command.
- enqueue a job/task