I have a bunch of online data that I want to download and process efficiently. Downloading already takes some time but cpu-bound processing takes much longer. I struggle to implement a combination of async and ProcessPoolExecutor.
import asyncio
import time
import aiohttp
from aiohttp import ClientSession
from concurrent.futures import ProcessPoolExecutor
class WebData:
def __init__(self, url):
self.url = url
self.binary = b''
async def download(self, client):
time.sleep(0.2)
try:
async with client.get(self.url, timeout=5) as resp:
self.binary = await resp.read()
print(f'Downloaded {self.url}')
except (aiohttp.ClientConnectionError,
asyncio.exceptions.TimeoutError):
pass
return
def process(self):
print(f'Start processing {self.url}')
time.sleep(1)
print(f'Finished processing {self.url}')
async def main():
list_urls = [f'https://www.google.com/search?q={i}'
for i in range(10)]
list_obj = [WebData(url) for url in list_urls]
with ProcessPoolExecutor() as executor:
async with ClientSession() as session:
tasks = [obj.download(session) for obj in list_obj]
await asyncio.gather(*tasks)
list_futures = [
executor.submit(obj.process)
for obj in list_obj]
return list_futures
res = asyncio.run(main())
This works as expected but it fails to accomplish what I am looking for. It first downloads all data and starts processing it only afterwards, which leaves my cores idle during download. Is there any way I can pipe the downloaded objects to the executor while other objects are still downloading?
I found this thread but it isn't what I need.