3

I have a list of URLs for large files to download (e.g. compressed archives), which I want to process (e.g. decompress the archives).

Both download and processing take a long time and processing is heavy on disk IO, so I want to have just one of each to run at a time. Since the two tasks take about the same time and do not compete for the same resources, I want to download the next file(s) while the last is being processed.

This is a variation of the producer-consumer problem.

The situation is similar to reading and processing images or downloading loads of files, but my downloader calls are not (yet) picklable, so I have not been able to use multiprocessing, and both tasks take about the same time.

Here is a dummy example, where both download and processing are blocking:

import time
import posixpath

def download(urls):
    for url in urls:
        time.sleep(3)  # this is the download (more like 1000s) 
        yield posixpath.basename(url)

def process(fname):
    time.sleep(2)  # this is the processing part (more like 600s)

urls = ['a', 'b', 'c']
for fname in download(urls):
    process(fname)
    print(fname)

How could I make the two tasks concurrent? Can I use yield or yield from in a smart way, perhaps in combination with deque? Or must it be asyncio with Future?

Community
  • 1
  • 1
j08lue
  • 1,647
  • 2
  • 21
  • 37
  • Possibly Copy of: http://stackoverflow.com/questions/16181121/python-very-simple-multithreading-parallel-url-fetching-without-queue – Fallenreaper Sep 21 '16 at 21:59
  • No, not quite. That one is about multiple concurrent downloads. I need only one download at a time and the consumer should be aware of every new file. – j08lue Sep 21 '16 at 22:07

2 Answers2

1

A year later, we are actually using Python 3's asyncio and aiohttp.

j08lue
  • 1,647
  • 2
  • 21
  • 37
0

I'd simply use threading.Thread(target=process, args=(fname,)) and start a new thread for processing.

But before that, end last processing thread :

t = None
for fname in download(urls):
    if t is not None: # wait for last processing thread to end
        t.join()
    t = threading.Thread(target=process, args=(fname,))
    t.start()
    print('[i] thread started for %s' % fname)

See https://docs.python.org/3/library/threading.html

j08lue
  • 1,647
  • 2
  • 21
  • 37
Loïc
  • 11,804
  • 1
  • 31
  • 49
  • Cool, yes, that should work and is actually quite simple. – j08lue Sep 21 '16 at 22:12
  • it should indeed, even if I coded that blind and didn't test. Please let me know if there are any issue so I can fix my answer. – Loïc Sep 21 '16 at 22:13
  • I am quite sure this is a working answer, but the thing is my code is running inside some application (QGIS) that seems not to like when I use python `threading` (it crashes). I have to investigate the details but a solution based on coroutine might be safer. – j08lue Sep 21 '16 at 22:27
  • I think that a `t.join()` is missing at the end of the `for` loop. Add it in a `else` clause. – Laurent LAPORTE Sep 22 '16 at 05:17
  • I did import threading. I will try the solution outside that application and let you know if it worked. @LaurentLAPORTE that seems right. What do you say, @Loïc? – j08lue Sep 23 '16 at 08:01
  • I don't think it is missing, maybe at the last pass through the loop there is no join() though the program doesn't need the thread to finish specifically. – Loïc Sep 24 '16 at 12:12