0

After reading a lot of question/answers on this subject as well as the official doc :

Questions / Answers :

  1. Python multiprocessing - starmap_async does not work where starmap does?
  2. multiprocessing.Pool.map_async doesn't seem to... do anything at all?
  3. Understand starmap_async() in multiprocessing programmation

...and many others

Official doc : HERE

I still don't understand the meaning of having the possibility to run "apparently" a non-blocking code, by calling starmap_async(),

Since, as far as I understand, we need to call the wait() or get() method on it in order to make it run and get the actual result.

This is what those answer explains well :

https://stackoverflow.com/a/56462430/13000695

Pool's async-methods return objects which are conceptually "explicit futures", you need to call .get() to await and receive the actual result

https://stackoverflow.com/a/70674001/13000695

.wait() waits forever for the process to finish.

So : Those two methods will simply turn the non-blocking code into a blocking one since it will wait until this task has been completed.

I am looking for a way to run multiprocess while not blocking the main thread. I build a server using FastAPI library, which I plan to use asychronously, but currently the build_cache() method will block the main thread and the WebServer will not receive incoming HTTP requests until the cache has been downloaded :

from multiprocessing.pool import Pool
from multiprocessing import cpu_count
def get_from_ftp(file_name):
    flo = BytesIO()
    ftp_server = FTP(ftp_url)
    ftp_server.login()
    ftp_server.retrbinary(f'RETR {file_name}', flo.write)
    flo.seek(0)
    # write file on disk...

async def build_cache():
    with Pool(cpu_count() - 1) as pool:
        some_files_to_download = ['file1', 'file2']
        result = pool.starmap_async(get_from_ftp, some_files_to_download)
        result.wait() # <=== This will block the main thread and the server will not be able to receive incoming requests

Then on my FastAPI main file :

app = FastAPI()

@app.on_event("startup")
async def startup():
    await build_cache()

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
jossefaz
  • 3,312
  • 4
  • 17
  • 40
  • Not sure if I am following you correctly. In all cases the execution in a pool takes some time from the start until finished, i.e. until all results are collected. The difference is whether you do it in one step (sync) or two steps (async) with the possibility to do something else between start and finish. – VPfB Sep 29 '22 at 11:28
  • @VPfB : you followed me correctly and this is the purpose of a non-blocking code that async is supposed to provide (will not block the main thread but will run the event loop instead, and the main thread ca perform other tasks in at the same time). However, using starmap_async() will have no effect until we call `get()` or `wait()` explicitly. This is what I don't understand because calling one of these method will simply turn a non-blocking to a blocking code...isn't it ? – jossefaz Sep 29 '22 at 11:49
  • Method `starmap_async` is implemented by calling private method `_starmap_async` with all of its passed arguments and returning the result, which is an `AsyncResult` instance. Method `starmap` is implemented by calling the same `_starmap_async` method but returns the result of calling method `get` on the `AsycnResult` instance. So for practical purposes, calling `starmap` is equivalent to calling `starmap_async` immediately followed by calling `get` on the returned `AsyncResult` instance. (more...) – Booboo Sep 29 '22 at 12:04
  • So use `starmap_async` explicitly if you want to also compute in parallel something else that is being submitted to the pool or you just want the main thread to continue working on something else while the `starmap_async` tasks are being processed. But eventually, you will need to call `get` on the `AsyncResult` (or `Pool.close()` followed by `Pool.join()`) and it will block until all submitted tasks have completed. It would have helped if you had posted actual code so we could see what you are trying to accomplish. `get` waits for completion but submitted tasks start and process immediately. – Booboo Sep 29 '22 at 12:09
  • @Booboo : So there is no way to tell to the interpreter : "Open the process and do those task there, I don't care how long it takes and I even don't care about the result, just do it and release me the main thread please" ? – jossefaz Sep 29 '22 at 13:05
  • Yes! You can call `starmap_async` . This will result in a "task" being created for each set of arguments created from the passed *iterables* and will be processed by the pool while the main process continues to run. **But:** If the main processes terminates with not all the submitted tasks having run to completion, they never will because the pool processes are *daemon* processes that terminate immediately when the main process terminates. So if you are not interested in any results returned, at least do `pool.close(); pool.join()` to wait for the submitted tasks to complete *before* you exit. – Booboo Sep 29 '22 at 13:32
  • By the way, if the main process, which will continue to run, is doing anything CPU-intensive, then you should create a pool that does not use all the cores (leave one for the main process). So: `pool = multiprocessing.Pool(multiprocessing.cpu_count() - 1)`. Of course, if the number of tasks being submitted is smaller than the number of cores, you should set the pool size accordingly to that smaller number so as to not create any pool processes that will never do any constructive work. – Booboo Sep 29 '22 at 13:35
  • @Booboo : First of thanks ffor all these explanations. I updated my question with some code. Does it help to better understand what I try to achieve ? – jossefaz Sep 29 '22 at 13:55
  • Yes. Your worker function appears to be mostly network and disk I/O activity for which multithreading would be a better approach and for which you might consider creating a pool size equal in size to the number of files you will be downloading (but I wouldn't create a pool size greater than a couple of hundred in any case). But unless you have a solid state drive there could be a lot of disk contention writing the files that would result in the writing running more slowly then writing the files sequentially. So you might need to play around with the pool size. (more...) – Booboo Sep 29 '22 at 14:19
  • But if you are running using asyncio, you should use instead the `run_in_executor` method, which would allow other asyncio tasks to run. – Booboo Sep 29 '22 at 14:23
  • Look at the implementation. Ignoring minor details,`starmap(...)` is equivalent to `starmap_async(...).get()`. Don't confuse the async character of the call with the `asyncio` which is a completely different topic. There is no event loop in `starmap_async` involved. – VPfB Sep 29 '22 at 15:15
  • @VPfB thanks. I also looked at it but I thought that I missed something on this point and I was looking for how should it run in the Event Loop… but yeah I begin to understand that the name is a bit confusing. Thanks for that too – jossefaz Sep 29 '22 at 20:19

0 Answers0