I've looked at this question and have talked with chatGPT/bing (lol) and still can't figure this one out.
I am trying to execute ~7mn requests to an API on a machine with 32 CPUs, loading the data to postgres. I have my code essentially set up like this:
from aiomultiprocess import Pool
CPUS = 30 #(32 CPUS available)
batch_size = 5000
for i in range(0, len(args_list), batch_size):
log.info(f"<log message>")
async with Pool(
processes=CPUS,
maxtasksperchild=100,
childconcurrency=3,
queuecount=int(CPUS / 3),
) as pool:
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
--------EDIT:
Adding a redaction of fetch_api_results
per request in the comments. It is basically a set of functions that construct the api url, and then make aiohttp requests recursively until there are no more next_url
tokens in the api request results.
Here it is.
from aiohttp import request
async def fetch_api_results(*args)
try:
result_objects= APIPaginator(*args)
await result_objects.fetch()
log.info("uploading data")
#upload to db function
except planned_exceptions as e:
log.warning(e, exc_info=False)
class APIPaginator(object):
async def query_data(self):
url = self.api_base + "<str from arg>"
payload = {"limit": 1000}
await self.query_all(url, payload)
async def query_all(self, url, payload):
try:
async with request(method="GET", url=url, params=payload) as response:
log.info(f"status code: {response.status}")
if response.status == 200:
results = await response.json()
self.results.append(results)
next_url = results.get("next_url")
if next_url:
await self.query_all(next_url)
else:
response.raise_for_status()
except: #(except block omitted)
async def fetch(self):
await self.query_data()
END OF EDIT --------------
It will run for an hour or two (expecting it to take a day or two) and will then freeze. No errors thrown. When I keyboard-interrupt it, I will see the OSError: [Errno 24] Too many open files
error.
I've put the traceback below.
From my understanding, it seems like the issue of too many files handlers being open has to do with the spawning of new worker processes in the pool. What confuses me is that the docs say that the maxtasksperchild limit, when reached, should result in an old worker process getting killed and a new one being spawned. This is too prevent memory leaks, and I would assume, to keep this problem from happening.
However, changing the maxtasksperchild parameter has yielded no change.
Furthermore, I implemented the batch processing to effectively kill the pool and start a new one after every 5000 tasks to also prevent an accumulation of file_handlers. The with pool:
implementation should effectively kill everything to do with that pool once the with block closes. But this has also failed. There was no change after implementing the batching method.
It has all left me pretty confused. It's clear it has to do with the piping of newly spawned processes, but I'm not sure what to do. Any feedback would be welcome.
A short-term fix that would just extend the amount of time I have before the script fails could be to increase the max number of possible files to open (per the linked answer, using ulimit -n
). But I would fear this will be exceeded too, since this is going to be a pretty long running job.
Any help is very appreciated!
Here is the full traceback:
File "<path-to-file>.py", line 127, in import_data
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 136, in results
return await self.pool.results(self.task_ids)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 312, in results
await asyncio.sleep(0.005)
File "/<path-to-env>/lib/python3.11/asyncio/tasks.py", line 639, in sleep
return await future
File "/<path-to-env>/3.11.1/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/<path-to-file>/main.py", line 39, in add_all_data
await import_data(args)
File "/<path-to-file>/orchestrator.py", line 120, in import_data
async with Pool(
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 196, in __aexit__
await self.join()
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 379, in join
await self._loop
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 229, in loop
self.processes[self.create_worker(qid)] = qid
^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 261, in create_worker
process.start()
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/core.py", line 153, in start
return self.aio_process.start()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
return Popen(process_obj)
^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/<path-to-env>/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/home/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 58, in _launch
self.pid = util.spawnv_passfds(spawn.get_executable(),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/util.py", line 451, in spawnv_passfds
errpipe_read, errpipe_write = os.pipe()
^^^^^^^^^
OSError: [Errno 24] Too many open files