0

I am unable to startup an arq worker in FastAPI even though i was able to create a redis pool and submit a task, but without an active worker, the task would not be executed.

I called the Worker class and the run method but got this response,

File "/usr/local/lib/python3.10/site-packages/arq/worker.py", line 281, in run
    self.loop.run_until_complete(self.close())
  File "uvloop/loop.pyx", line 1511, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1504, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1377, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 518, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.

I called ARQ worker class as follows:

 w = Worker(
          functions=[SMS.startSendingSms],
          redis_settings=redis_conn,
          on_startup = startup,
          max_jobs=1000,
          keep_result_forever=True,
          job_timeout=86000,
          max_tries=1000
    )
w.run()

This was supposed to start the worker but threw up "event loop is already running" error.

Furthermore, i decided to run it outside of the main event loop, in a docker container.

Function to execute

class SMS:
  
  async def startSendingSms(ctx, data, dataHeader, userInfo):
    ........

Worker Class to startup

@dataclass
class WorkerSettings:
  
  async def worker():
    redis_conn = await Redis.redis_settings()
    
    w = Worker(
          functions = [SMS.startSendingSms],
          redis_pool=redis_conn,
          # on_startup = startup,
          max_jobs=1000,
          keep_result_forever=True,
          job_timeout=86000,
          max_tries=1000
      )
    w.run()

Docker Command

command: arq app.services.worker.worker.WorkerSettings

Error Response

assert len(self.functions) > 0, 'at least one function or cron_job must be registered'
AssertionError: at least one function or cron_job must be registered

The function, startSendingSms is being registered and i am at a loss why this error is coming up.

Problem Resolved -- Solution

I ran ARQ worker in a docker container and the function in a FASTAPI class.

DOCKER To Start ARQ Worker

mxs-arq:
    container_name: mxs-arq
    build:
     context: ./backend
    volumes:
     - ./backend:/appINFO
    command: arq app.services.worker.worker.WorkerSettings
    restart: always
    depends_on:
     - redis

Python Class for ARQ Worker

@dataclass
class WorkerSettings:
    redis_pool = RedisSettings(
    host=settings.REDIS_DB,
    port=settings.REDIS_PORT,
    password=None
    )
    
 w = Worker(
    functions=[SendingSmsTest],
    queue_name='messaging',
    redis_settings = redis_pool,
    max_jobs=1000,
    keep_result_forever=True,
    job_timeout=86000,
    max_tries=1000
   )
   w.run()

Steps to Follow

  1. Start the docker container.
  2. Run the create_pool command.
  3. enqueue a job/task
Oky
  • 1
  • 1
  • 1
    Any reason why you want to run the worker inside of FastAPI and not in a process by itself where the workers usually reside? – MatsLindh Dec 01 '22 at 13:25
  • Also see https://stackoverflow.com/questions/46827007/runtimeerror-this-event-loop-is-already-running-in-python – MatsLindh Dec 01 '22 at 13:32
  • I created the redis pool and queued the task inside FASTAPI successful. Can you advise on the best way to run an arq worker? – Oky Dec 01 '22 at 13:39
  • @MatsLindh Pls could you check my updated problem statement. – Oky Dec 01 '22 at 16:17
  • I was able to crack it. Updated the problem statement with the solution to help someone/people in the future. @MatsLindh, your comment got me thinking to eventually arrive at a solution. Thanks. – Oky Dec 01 '22 at 19:56

0 Answers0