0

I have to download MANY compressed bz2-files and unzip them for further processing. Downloading is I/O bound, unzipping is CPU bound. So I thought I might be best off combining a ThreadPoolExecutor with a ProcessPoolExecutor. To be clear on this: I don't want to wait with the unzipping until all files are downloaded. I rather want to use my CPU resources while other files are still being downloaded. I read this thread, but it doesn't seem useful to me. I have this code:

import bz2
import requests
from concurrent import futures
from io import BytesIO

class Source:
    
    def __init__(self, url):
        self.url = url
        self.compressed = None
        self.binary = None
        
    def download(self):
        print(f'Start downloading {self.url}')
        req = requests.get(self.url, timeout=5)
        self.compressed = req.content
        print(f'Finished downloading {self.url}')
        return self
    
    def unzip(self):
        print(f'Start unzipping {self.url}')
        with bz2.open(BytesIO(self.compressed), 'rb') as file:
            self.binary = file.read() 
        print(f'Finished unzipping {self.url}')
        return self
    
list_sources_init = [Source(url) for url in list_urls]

with futures.ThreadPoolExecutor() as executor_threads, futures.ProcessPoolExecutor() as executor_processes:
    list_futures_after_download = [
        executor_threads.submit(source.download)  
        for source in list_sources_init
    ]
    list_futures_after_unzip = []
    for future in futures.as_completed(list_futures_after_download):
        source = future.result()
        list_futures_after_unzip.append(executor_processes.submit(source.unzip))
            
list_sources_unzipped = [future.result() for future in list_futures_after_unzip]

This works, but it seems a bit fishy. Furthermore, I wonder why the elements in list_sources_init do not get downloaded. In the first place I planned to get along with this list only and perform the parallel action on its elements. Now I ended up with 3 lists, partially containing the same data. Most painfully is that the compressed data is stored in list_futures_after_download as well as in list_futures_after_unzip.

I guess there is a better way to do it. But how?

Durtal
  • 1,063
  • 3
  • 11
  • If you have working code and are looking for general, open-ended improvements your question is more appropriate for [CodeReview](https://codereview.stackexchange.com) – be sure to check their [question guide](https://codereview.meta.stackexchange.com/questions/2436/how-to-get-the-best-value-out-of-code-review-asking-questions) first, though. – MisterMiyagi Sep 21 '21 at 08:45
  • I see no reason to use threads here. Just do the download and unzipping in each sub-process –  Sep 21 '21 at 09:10

1 Answers1

2

This is more concise. I see no benefit in using threads for this:-

from multiprocessing import Pool, freeze_support
import requests
import bz2
from io import BytesIO

URL_LIST = []

def processURL(url):
    try:
        with requests.Session() as session:
            r = session.get(url, timeout=5)
            r.raise_for_status()
            with bz2.open(BytesIO(r.content), 'rb') as data:
                return data.read()
    except Exception:
        pass # will implicitly return None


def main():
    with Pool() as pool:
        results = pool.map(processURL, URL_LIST)
        for r in results:
            print(r)


if __name__ == '__main__':
    freeze_support()
    main()