0

I am creating a subprocess from my FastAPI application as follows:

proc = await asyncio.create_subprocess_shell(
    cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)

I am using the asyncio.create_subprocess_shell module function for the purpose of capturing the program's stdout line by line.

How can I make it so that the process uses a specific executor? I tried this:

pool = ProcessPoolExecutor(max_workers=10)
loop = asyncio.get_running_loop()

task = partial(
    asyncio.create_subprocess_shell,
    cmd,
    stdout=asyncio.subprocess.PIPE,
    stderr=asyncio.subprocess.PIPE,
)

proc = await loop.run_in_executor(pool, task)

But it fails with this error:

TypeError: cannot pickle 'coroutine' object

arielnmz
  • 8,354
  • 9
  • 38
  • 66
  • Please have a look at [this answer](https://stackoverflow.com/a/71613757/17865804), as well as [this answer](https://stackoverflow.com/a/70873984/17865804) and [this answer](https://stackoverflow.com/a/71517830/17865804). – Chris Jun 23 '23 at 04:15

1 Answers1

0

The .create_subprocess_shell will give you direct, detailed, control over that single sub-process, including the capability of interacting with it via pipe.

A process executor pool, on the other hand, does a different thing altogether: its processes are a specialized type of process running Python code, and being able to import your project modules, and each of them will work as a server of sort, waiting for the .submit (or .map) methods in the coordinating process to pass then Python tasks in the form of callables + arguments. They are not a "general case sub-process" that can run any arbitrary program.

In this case, what you want is simply create several processes wrapped in tasks, with the create_subproces_shell in a container object (a set or list will do), and iterate over this container to get to them. Use some manually coded logic to limit the number of concurrent processes if you need - the logic built-in concurrent.futures.Executors won't apply for this use case.

jsbueno
  • 99,910
  • 10
  • 151
  • 209
  • 1
    The reason I wanted to use an Executor was to avoid having to manually code the limit for concurrent processes, and also to keep it separate from whatever executor is the default in the FastAPI app. Fortunately for my specific use case the code to execute is a normal python library, so I was able to use an Executor (and a Process object) but had to work around capturing stdout using queues, a manager and redirecting stdout to a custom stringio buffer – arielnmz Jun 26 '23 at 17:07