0

I've looked at this question and have talked with chatGPT/bing (lol) and still can't figure this one out.

I am trying to execute ~7mn requests to an API on a machine with 32 CPUs, loading the data to postgres. I have my code essentially set up like this:

from aiomultiprocess import Pool
CPUS = 30 #(32 CPUS available)
batch_size = 5000

for i in range(0, len(args_list), batch_size):
        log.info(f"<log message>")
        async with Pool(
            processes=CPUS,
            maxtasksperchild=100,
            childconcurrency=3,
            queuecount=int(CPUS / 3),
        ) as pool:
            await pool.starmap(fetch_api_results, args_list[i : i + batch_size])

--------EDIT: Adding a redaction of fetch_api_results per request in the comments. It is basically a set of functions that construct the api url, and then make aiohttp requests recursively until there are no more next_url tokens in the api request results.

Here it is.

from aiohttp import request

async def fetch_api_results(*args)
    try:
        result_objects= APIPaginator(*args)
        await result_objects.fetch()
        log.info("uploading data")
        #upload to db function

    except planned_exceptions as e:
        log.warning(e, exc_info=False)


class APIPaginator(object):
    async def query_data(self):
        url = self.api_base + "<str from arg>"
        payload = {"limit": 1000}
        await self.query_all(url, payload)

    async def query_all(self, url, payload):
        try:
            async with request(method="GET", url=url, params=payload) as response:
                log.info(f"status code: {response.status}")
                if response.status == 200:
                    results = await response.json()
                    self.results.append(results)
                    next_url = results.get("next_url")
                    if next_url:
                        await self.query_all(next_url)
                else:
                    response.raise_for_status()
         except: #(except block omitted)

    async def fetch(self):
        await self.query_data()

END OF EDIT -------------- It will run for an hour or two (expecting it to take a day or two) and will then freeze. No errors thrown. When I keyboard-interrupt it, I will see the OSError: [Errno 24] Too many open files error.

I've put the traceback below.

From my understanding, it seems like the issue of too many files handlers being open has to do with the spawning of new worker processes in the pool. What confuses me is that the docs say that the maxtasksperchild limit, when reached, should result in an old worker process getting killed and a new one being spawned. This is too prevent memory leaks, and I would assume, to keep this problem from happening.

However, changing the maxtasksperchild parameter has yielded no change.

Furthermore, I implemented the batch processing to effectively kill the pool and start a new one after every 5000 tasks to also prevent an accumulation of file_handlers. The with pool: implementation should effectively kill everything to do with that pool once the with block closes. But this has also failed. There was no change after implementing the batching method.

It has all left me pretty confused. It's clear it has to do with the piping of newly spawned processes, but I'm not sure what to do. Any feedback would be welcome.

A short-term fix that would just extend the amount of time I have before the script fails could be to increase the max number of possible files to open (per the linked answer, using ulimit -n). But I would fear this will be exceeded too, since this is going to be a pretty long running job.

Any help is very appreciated!

Here is the full traceback:

File "<path-to-file>.py", line 127, in import_data
    await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 136, in results
    return await self.pool.results(self.task_ids)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 312, in results
    await asyncio.sleep(0.005)
  File "/<path-to-env>/lib/python3.11/asyncio/tasks.py", line 639, in sleep
    return await future

File "/<path-to-env>/3.11.1/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/<path-to-file>/main.py", line 39, in add_all_data
    await import_data(args)
  File "/<path-to-file>/orchestrator.py", line 120, in import_data
    async with Pool(
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 196, in __aexit__
    await self.join()
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 379, in join
    await self._loop
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 229, in loop
    self.processes[self.create_worker(qid)] = qid
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 261, in create_worker
    process.start()
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/core.py", line 153, in start
    return self.aio_process.start()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/<path-to-env>/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/home/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 58, in _launch
    self.pid = util.spawnv_passfds(spawn.get_executable(),
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/util.py", line 451, in spawnv_passfds
    errpipe_read, errpipe_write = os.pipe()
                                  ^^^^^^^^^
OSError: [Errno 24] Too many open files
Jhirschibar
  • 215
  • 1
  • 3
  • 16

1 Answers1

1

Move the recursive call to fetch the next URL outside the async with request... block.

Any resources used in the base request are not freed until everything returns the way it is:

def query_all(self, ...):
    try:
        async with request(method="GET", url=url, params=payload) as response:
            log.info(f"status code: {response.status}")
            next_url = None
            if response.status == 200:
                results = await response.json()
                self.results.append(results)
                next_url = results.get("next_url")
            else:
                response.raise_for_status()
    except ...:
        ...
    if next_url:
        await self.query_all(next_url)

If the problem persists, than change this nested call to query_all to its own task, but use a FIFO queue in the instance so that the call at query_data awaits for all sub-calls to query_all.

That should improve things there - can't be sure without a reproducible example, tough.

jsbueno
  • 99,910
  • 10
  • 151
  • 209
  • 1
    I see how this could make the requests more efficient, thanks. But I'm curious as to how this will help the base issue of the OSError? For context, at most the recursion will go two levels deep, which is rare. Most don't have a next_url, so in this case I think the impact is minimal – Jhirschibar Apr 06 '23 at 15:08
  • 1
    without a full working example it is hard to tell - but something is leaking resources in there - a requests.get will likely open a client-side socket, and that will count as a "file" in Unix. If some of those , say 1% get stuck in a recursive call that never returns, that might be the cause. Otherwise, just scrutinizing the complete code to find where the leak may be. Maybe adding an `asyncio.timeout` and cancel stalled requests in `query_data` can help as well. – jsbueno Apr 06 '23 at 20:40
  • I updated the function to move next_url outside of the nested block @jsbueno, unfortunately the issue still persists. Script starts to hang after around 2 hours – Jhirschibar Apr 10 '23 at 17:15