5

I have the following class in a FastAPI application:

import asyncio
import logging
from multiprocessing import Lock, Process

from .production_status import Job as ProductionStatusJob


class JobScheduler:
    loop = None
    logger = logging.getLogger("job_scheduler")
    process_lock = Lock()
    JOBS = [ProductionStatusJob]

    @classmethod
    def start(cls) -> None:
        cls.logger.info("Starting Up (1/2)")
        Process(target=cls._loop).start()
    
    @classmethod
    def _loop(cls) -> None:
        cls.loop = asyncio.get_event_loop()
        cls.loop.create_task(cls._run())
        cls.logger.info("Startup Complete (2/2)")
        cls.loop.run_forever()
        cls.loop.close()

    @classmethod
    async def _run(cls) -> None:
        while True:
            ...

    @classmethod
    async def stop(cls) -> None:
        cls.logger.info("Shutting Down (1/2)")
        with cls.process_lock:
            cls.loop.stop()                          # <= This Line
            cls.loop.close()
        cls.logger.info("Shutdown Complete (2/2)")
        cls.loop = None

On the startup and shutdown events of the FastAPI application, the JobScheduler.start() and JobScheduler.stop() methods will be called.

The start method works smoothly, but in stop I get an error:

File "/backend/app/main.py", line 146, in stop_job_scheduler
2023-08-16 11:46:27     await job_scheduler.stop()
2023-08-16 11:46:27   File "/backend/app/jobs/__init__.py", line 59, in stop
2023-08-16 11:46:27     cls.loop.stop()
2023-08-16 11:46:27 AttributeError: 'NoneType' object has no attribute 'stop'

But cls.loop is set during the _loop method (which is executed at the end of start) - so why does cls.loop still have its initial None value when the stop method is called?

Are there any better approaches to clean up the background processes when the FastAPI application calls shutdown?

Karl Knechtel
  • 62,466
  • 11
  • 102
  • 153
  • You are not setting `cls.loop` in `start()`. You're setting it in `_loop()`. Are you one hundred thousand percent sure that `_loop()` is being called *before* `stop()` is executed? – Silvio Mayolo Aug 16 '23 at 02:58
  • 2
    I guess the problem is multiprocessing will fork, and create a separate copy of `cls`, and it will set the `loop` attribute for the copy, not for the one you called `.stop` from. – Selcuk Aug 16 '23 at 03:03
  • @SilvioMayolo Oops. It was a typing error, I'll fix it right away. Yes, I'm positive that the `_loop()` is called before `stop()` because I can verify that both `cls.logger.info("Startup Complete (2/2)")` and the processes in `_run()` are completed before I stop the application (hence calling the `stop()`) – Andrew Sharifikia Aug 16 '23 at 03:03
  • @Selcuk but that's stupid. Why would I want to use a Class Variable, unless I want it to stay the same for everytime the class variable is called/used? I guess I'll try to use a `mutliprocess.Manager` object and see how it goes! Thanks for the tip! – Andrew Sharifikia Aug 16 '23 at 03:05
  • 2
    Class variables are singletons, but only for the current process. Think of multiprocessing as if you are spinning up a new `python` interpreter every time you fork/spawn (it is smarter than this, but you get the idea). Consider using multithreading since you need to share memory in this case. – Selcuk Aug 16 '23 at 03:07
  • 1
    Oops! I didn't see the `Process` part. Selcuk is right, you get a new copy for each process. – Silvio Mayolo Aug 16 '23 at 03:08
  • Does [multiprocessing: sharing a large read-only object between processes?](https://stackoverflow.com/q/659865) answer your question? – Karl Knechtel Aug 16 '23 at 03:26

2 Answers2

3

multiprocessing in Python is funny. It's more powerful than multithreading but also comes with some caveats. The first of those is that you're actually running a different Python interpreter entirely. That means that global variables and the like are going to get a new copy for each process you run.

Depending on your operating system and choice of start method, your processes may be forked or spawned. A spawned process will start anew, as though a new Python program was just spun up. A forked process will get all of the current values of variables from the source process, but it'll still copy all of those variables. Future changes to either process will not affect the other, without explicit synchronization using one of the multiprocessing helpers.

You can use a Manager to synchronize data between processes explicitly. This acts sort of like a local server that both processes connect to. For more explicitly pub-sub data, you can also use a Queue to pass information from one process to another.

Silvio Mayolo
  • 62,821
  • 6
  • 74
  • 116
0

Thanks to Silvio and Selcuk the root cause of the issue was found. For anyone who's wondering how I solved this in practice, here it is:

I stored the Process before the forking and killed it at the stop:


class JobScheduler:
    manager = None
    loop = None
    logger = _logger
    process_lock = Lock()
    JOBS = [ProductionStatusJob]

    @classmethod
    def start(cls) -> None:
        cls.logger.info("Starting Up (1/2)")
        cls.loop = Process(target=cls._loop)
        cls.loop.start()
    
    @classmethod
    def _loop(cls) -> None:
        loop = asyncio.get_event_loop()
        loop.create_task(cls._run())
        cls.logger.info("Startup Complete (2/2)")
        loop.run_forever()
        loop.close() # <= is probably never called
...

    @classmethod
    async def stop(cls) -> None:
        cls.logger.info("Shutting Down (1/2)")
        with cls.process_lock:
            cls.loop.kill()
        cls.logger.info("Shutdown Complete (2/2)")
        cls.loop = None
  • 1
    The only word of warning with *forking* behavior is that it's Unix-specific. You can't fork a process on Windows, unfortunately. Other than that, it's *insanely* convenient. – Silvio Mayolo Aug 16 '23 at 03:28