0

I want to process files in three steps. step1 (preprocess) and step3 (postprocess) are i/o-bound, step2 (process) is cpu-bound. To do this, I try to nest asyncio and multiprocessing but I cannot get it done. I have:

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import asyncio
from time import sleep
import random
import logging

logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s")
random.seed(42)


class MyClass:
    def __init__(self, val: int):
        self.val = val
        self.rand1 = random.random()
        self.rand2 = 2 * random.random()
        self.res = None
        self.flag = False

    def __repr__(self):
        return f"Obj {self.val}"

    async def preprocess(self):
        logging.info(f"Preprocessing {self}")
        await asyncio.sleep(self.rand1)
        logging.info(f"Preprocessed {self}")
        return self

    def process(self):
        logging.info(f"Process {self}")
        sleep(self.rand2)
        logging.info(f"Processed {self}")
        self.res = 2 * self.val
        return self

    async def postprocess(self):
        logging.info(f"Postprocessing {self}")
        await asyncio.sleep(0.1)
        self.flag = True
        logging.info(f"Postprocessed {self}")


async def pipeline(obj, executor):
    await obj.preprocess()
    future = executor.submit(obj.process)
    return future


async def main():
    with ProcessPoolExecutor() as executor:
        list_objs = [MyClass(v) for v in range(3)]
        futures_processing = await asyncio.gather(
            *[pipeline(obj, executor) for obj in list_objs]
        )
        logging.info("Tasks scheduled!".upper())
        tasks_postprocessing = [
            asyncio.create_task(future.result().postprocess())
            for future in concurrent.futures.as_completed(futures_processing)
        ]
        await asyncio.wait(tasks_postprocessing)
        logging.info("DONE!")

On my machine, this gives:

2022-11-18 14:41:49,409 Preprocessing Obj 0
2022-11-18 14:41:49,409 Preprocessing Obj 1
2022-11-18 14:41:49,409 Preprocessing Obj 2
2022-11-18 14:41:49,685 Preprocessed Obj 1
2022-11-18 14:41:49,698 Process Obj 1
2022-11-18 14:41:50,050 Preprocessed Obj 0
2022-11-18 14:41:50,051 Process Obj 0
2022-11-18 14:41:50,102 Processed Obj 0
2022-11-18 14:41:50,145 Processed Obj 1
2022-11-18 14:41:50,147 Preprocessed Obj 2
2022-11-18 14:41:50,147 TASKS SCHEDULED!
2022-11-18 14:41:50,148 Process Obj 2
2022-11-18 14:41:51,503 Processed Obj 2
2022-11-18 14:41:51,504 Postprocessing Obj 0
2022-11-18 14:41:51,504 Postprocessing Obj 1
2022-11-18 14:41:51,504 Postprocessing Obj 2
2022-11-18 14:41:51,605 Postprocessed Obj 0
2022-11-18 14:41:51,605 Postprocessed Obj 1
2022-11-18 14:41:51,605 Postprocessed Obj 2
2022-11-18 14:41:51,606 DONE!

Although Obj 0 and Obj 1 are processed, they are not post-processed until Obj 2 is pre-processed. I would like get the "TASK SCHEDULED" message immediately after pre-processing of Obj 2 started.

I do understand why this doesn't happen: The post-processing is only scheduled after asyncio.gather completed. I also tried:

async def main():
    with ProcessPoolExecutor() as executor:
        list_objs = [MyClass(v) for v in range(3)]
        tasks_preprocess = [asyncio.create_task(obj.preprocess()) for obj in list_objs]
        for t in asyncio.as_completed(tasks_preprocess):
            res = await t
            future = executor.submit(res.process)
            ...

But again, I cannot continue with the futures until all tasks are completed. I had a similar problem a while ago, but without the post-processing step.

I can get around some problems with using a callback to the process-future. But as it is told here, this is not the way it should be done.

Durtal
  • 1,063
  • 3
  • 11
  • Seems like you are looking for a [producer / consumer](https://stackoverflow.com/a/52615705/6242321) model so that each step in the process is handed off to the next stage which is already waiting for input to process. – jwal Nov 18 '22 at 16:18

0 Answers0