4

I am searching for huge number of addresses on web, I want to use both asyncio and ProcessPoolExecutor in my task to quickly search the addresses.

    async def main():
        n_jobs = 3
        addresses = [list of addresses]
        _addresses = list_splitter(data=addresses, n=n_jobs)
        with ProcessPoolExecutor(max_workers=n_jobs) as executor:
             futures_list = []
             for _address in _addresses:
                futures_list +=[asyncio.get_event_loop().run_in_executor(executor, execute_parallel, _address)]

                for f in tqdm(as_completed(futures_list, loop=asyncio.get_event_loop()), total=len(_addresses)):
                results = await f

asyncio.get_event_loop().run_until_complete(main())

expected: I want to execute_parallel function should run in parallel.

error:

    Traceback (most recent call last):
  File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 228, in <module>
    asyncio.run(main())
  File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 224, in main
    results = await f
  File "/usr/local/lib/python3.7/asyncio/tasks.py", line 533, in _wait_for_one
    return f.result()  # May raise f.exception().
TypeError: can't pickle coroutine objects
Vadim Kotov
  • 8,084
  • 8
  • 48
  • 62
Awaish Kumar
  • 537
  • 6
  • 22
  • here `execute_parallel` is also a coroutine. – Awaish Kumar May 23 '19 at 13:57
  • That won't work, you must organize your task so that you send ordinary functions to processes, and keep coroutines in the main process under asyncio. Running asyncio in subprocesses is technically possible, but quite involved, and you definitely can't just send off a coroutine to a subprocess. – user4815162342 May 23 '19 at 19:13
  • I have co-routines, I want to execute them from one function and then run that function in parallel for different addresses, but I don't know how can I achieve this. – Awaish Kumar May 24 '19 at 07:22
  • Coroutines automatically run in parallel, that is their whole point. You don't need separate processes to achieve parallelism in asyncio. – user4815162342 May 24 '19 at 08:02
  • yes I know that they are working in parallel for a single address but I want for example there are 3 processes and in each of them I am handling a separate address and doing whatever I want using coroutines – Awaish Kumar May 24 '19 at 08:10
  • If your coroutines are launched in parallel, they are working in parallel for all addresses. Again, you don't need separate processes to achieve parallelism in asyncio. – user4815162342 May 24 '19 at 08:34

1 Answers1

3

I'm not sure I'm answering the correct question, but it appears the intent of your code is to run your execute_parallel function across several processes using Asyncio. As opposed to using ProcessPoolExecutor, why not try something like using a normal multiprocessing Pool and setting up separate Asyncio loops to run in each. You might set up one process per core and let Asyncio work its magic within each process.

async def run_loop(addresses):
    loop = asyncio.get_event_loop()
    loops = [loop.create_task(execute_parallel, address) for address in addresses]
    loop.run_until_complete(asyncio.wait(loops))

def main():
    n_jobs = 3
    addresses = [list of addresses]
    _addresses = list_splitter(data=addresses, n=n_jobs)
    with multiprocessing.Pool(processes=n_jobs) as pool:
        pool.imap_unordered(run_loop, _addresses)

I've used Pool.imap_unordered with great success, but depending on your needs you may prefer Pool.map or some other functionality. You can play around with chunksize or with the number of addresses in each list to achieve optimal results (ie, if you're getting a lot of timeouts you may want to reduce the number of addresses being processed concurrently)