I want to process files in three steps. step1 (preprocess) and step3 (postprocess) are i/o-bound, step2 (process) is cpu-bound. To do this, I try to nest asyncio and multiprocessing but I cannot get it done. I have:
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import asyncio
from time import sleep
import random
import logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s")
random.seed(42)
class MyClass:
def __init__(self, val: int):
self.val = val
self.rand1 = random.random()
self.rand2 = 2 * random.random()
self.res = None
self.flag = False
def __repr__(self):
return f"Obj {self.val}"
async def preprocess(self):
logging.info(f"Preprocessing {self}")
await asyncio.sleep(self.rand1)
logging.info(f"Preprocessed {self}")
return self
def process(self):
logging.info(f"Process {self}")
sleep(self.rand2)
logging.info(f"Processed {self}")
self.res = 2 * self.val
return self
async def postprocess(self):
logging.info(f"Postprocessing {self}")
await asyncio.sleep(0.1)
self.flag = True
logging.info(f"Postprocessed {self}")
async def pipeline(obj, executor):
await obj.preprocess()
future = executor.submit(obj.process)
return future
async def main():
with ProcessPoolExecutor() as executor:
list_objs = [MyClass(v) for v in range(3)]
futures_processing = await asyncio.gather(
*[pipeline(obj, executor) for obj in list_objs]
)
logging.info("Tasks scheduled!".upper())
tasks_postprocessing = [
asyncio.create_task(future.result().postprocess())
for future in concurrent.futures.as_completed(futures_processing)
]
await asyncio.wait(tasks_postprocessing)
logging.info("DONE!")
On my machine, this gives:
2022-11-18 14:41:49,409 Preprocessing Obj 0
2022-11-18 14:41:49,409 Preprocessing Obj 1
2022-11-18 14:41:49,409 Preprocessing Obj 2
2022-11-18 14:41:49,685 Preprocessed Obj 1
2022-11-18 14:41:49,698 Process Obj 1
2022-11-18 14:41:50,050 Preprocessed Obj 0
2022-11-18 14:41:50,051 Process Obj 0
2022-11-18 14:41:50,102 Processed Obj 0
2022-11-18 14:41:50,145 Processed Obj 1
2022-11-18 14:41:50,147 Preprocessed Obj 2
2022-11-18 14:41:50,147 TASKS SCHEDULED!
2022-11-18 14:41:50,148 Process Obj 2
2022-11-18 14:41:51,503 Processed Obj 2
2022-11-18 14:41:51,504 Postprocessing Obj 0
2022-11-18 14:41:51,504 Postprocessing Obj 1
2022-11-18 14:41:51,504 Postprocessing Obj 2
2022-11-18 14:41:51,605 Postprocessed Obj 0
2022-11-18 14:41:51,605 Postprocessed Obj 1
2022-11-18 14:41:51,605 Postprocessed Obj 2
2022-11-18 14:41:51,606 DONE!
Although Obj 0 and Obj 1 are processed, they are not post-processed until Obj 2 is pre-processed. I would like get the "TASK SCHEDULED" message immediately after pre-processing of Obj 2 started.
I do understand why this doesn't happen: The post-processing is only scheduled after asyncio.gather
completed. I also tried:
async def main():
with ProcessPoolExecutor() as executor:
list_objs = [MyClass(v) for v in range(3)]
tasks_preprocess = [asyncio.create_task(obj.preprocess()) for obj in list_objs]
for t in asyncio.as_completed(tasks_preprocess):
res = await t
future = executor.submit(res.process)
...
But again, I cannot continue with the futures until all tasks are completed. I had a similar problem a while ago, but without the post-processing step.
I can get around some problems with using a callback to the process-future. But as it is told here, this is not the way it should be done.