6

My task is to download 1M+ images from a given list of urls. What is the recommended way to do so?

After having read Greenlet Vs. Threads I looked into gevent, but I fail to get it reliably to run. I played around with a test set of 100 urls and sometimes it finishes in 1.5s but sometimes it takes over 30s which is strange as the timeout* per request is 0.1, so it should never take more than 10s.

*see below in code

I also looked into grequests but they seem to have issues with exception handling.

My 'requirements' are that I can

  • inspect the errors raised while downloading (timeouts, corrupt images...),
  • monitor the progress of the number of processed images and
  • be as fast as possible.
from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
import cStringIO
import gevent.hub
POOL_SIZE = 300


def download_image_wrapper(task):
    return download_image(task[0], task[1])

def download_image(image_url, download_path):
    raw_binary_request = requests.get(image_url, timeout=0.1).content
    image = Image.open(cStringIO.StringIO(raw_binary_request))
    image.save(download_path)

def download_images_gevent_spawn(list_of_image_urls, base_folder):
    download_paths = ['/'.join([base_folder, url.split('/')[-1]])
                      for url in list_of_image_urls]
    parameters = [[image_url, download_path] for image_url, download_path in
             zip(list_of_image_urls, download_paths)]
    tasks = [gevent.spawn(download_image_wrapper, parameter_tuple) for parameter_tuple in parameters]
    for task in tasks:
        try:
            task.get()
        except Exception:
            print 'x',
            continue
        print '.',

test_urls = # list of 100 urls

t1 = time()
download_images_gevent_spawn(test_urls, 'download_temp')
print time() - t1
Community
  • 1
  • 1
Framester
  • 33,341
  • 51
  • 130
  • 192
  • 2
    Do you have to use threads? If you can use multiple processes instead you can do this with `multiprocessing.Pool` and you might find it simpler too. I use `pool.map(download_image, url_list)` and `pool.join()` to do something similar. – foz Nov 09 '15 at 09:25
  • 1
    @foz, thanks, but I also tried `multiprocessing.Pool` with similar issues. Also I was told, that `multiprocessing` is not the right tool for such kind of tasks: http://stackoverflow.com/a/27016937/380038 – Framester Nov 09 '15 at 10:01
  • Interesting! I can see that multiprocessing isn't as efficient/scalable but I don't see why it shouldn't work with a modest pool size (32 as you had). Hope you get a good answer to this as I think I'll learn something too! – foz Nov 09 '15 at 10:29
  • I want to download >12m images, so I want to do it as efficient as possible. – Framester Nov 09 '15 at 10:33
  • 1
    Did you look at trollius https://pypi.python.org/pypi/trollius? – Padraic Cunningham Nov 09 '15 at 21:34
  • 1
    can i recommend posting your code to codereview? not that it is off topic here (it isn't) but this would be a great question for that site as well and you could probably get some great answers for improving algorithmic efficiency. – Joseph Farah Nov 18 '15 at 05:28
  • 1
    The code is not working correctly according to the OP, and therefore would be off-topic on Code Review. – Phrancis Nov 18 '15 at 05:31

3 Answers3

1

I think it will be better to stick with urllib2, by example of https://github.com/gevent/gevent/blob/master/examples/concurrent_download.py#L1

Try this code, I suppose it is what you're asking.

import gevent
from gevent import monkey

# patches stdlib (including socket and ssl modules) to cooperate with other greenlets
monkey.patch_all()

import sys

urls = sorted(chloya_files)

if sys.version_info[0] == 3:
    from urllib.request import urlopen
else:
    from urllib2 import urlopen


def download_file(url):
    data = urlopen(url).read()
    img_name = url.split('/')[-1]
    with open('c:/temp/img/'+img_name, 'wb') as f:
        f.write(data)
    return True


from time import time

t1 = time()
tasks = [gevent.spawn(download_file, url) for url in urls]
gevent.joinall(tasks, timeout = 12.0)
print "Sucessful: %s from %s" % (sum(1 if task.value else 0 for task in tasks), len(tasks))
print time() - t1
Alex Yu
  • 3,412
  • 1
  • 25
  • 38
  • Thanks, I tried that code with `urlopen(..., timeout=0.1)` but it still took over 100s for 1000 urls, which indicates to me that it did not perform the requests in parallel. – Framester Nov 16 '15 at 08:28
  • Maybe it's network issues? In my test it took 10.1 seconds for 139 files from some czech-site. I also had a doubt about parallelism, but now I think that I was limited by remote webserver, not by gevent-urlib2 – Alex Yu Nov 16 '15 at 09:51
1

There's a simple solution using gevent and Requests simple-requests

Use Requests Session for HTTP persistent connection. Since gevent makes Requests asynchronous, I think there's no need for timeout in HTTP requests.

By default, requests.Session caches TCP connections (pool_connections) for 10 hosts and limits 10 concurrent HTTP requests per cached TCP connections (pool_maxsize). The default configuration should be tweaked to suit the need by explicitly creating an http adapter.

session = requests.Session()
http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
session.mount('http://', http_adapter)

Break the tasks as producer-consumer. Image downloading is producer task and Image processing is consumer task.

If the image processing library PIL is not asynchronous, it may block producer coroutines. If so, consumer pool can be a gevent.threadpool.ThreadPool. f.e.

from gevent.threadpool import ThreadPool
consumer = ThreadPool(POOL_SIZE)  

This is an overview of how it can be done. I didn't test the code.

from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
from io import BytesIO
import os
from urlparse import urlparse
from gevent.pool import Pool

def download(url):
    try:
        response = session.get(url)
    except Exception as e:
        print(e)
    else:
        if response.status_code == requests.codes.ok:
            file_name = urlparse(url).path.rsplit('/',1)[-1]
            return (response.content,file_name)
        response.raise_for_status()

def process(img):
    if img is None:
        return None
    img, name = img
    img = Image.open(BytesIO(img))
    path = os.path.join(base_folder, name)
    try:
        img.save(path)
    except Exception as e:
        print(e)
    else:
        return True

def run(urls):        
    consumer.map(process, producer.imap_unordered(download, urls))

if __name__ == '__main__':
        POOL_SIZE = 300
        producer = Pool(POOL_SIZE)
        consumer = Pool(POOL_SIZE)

        session = requests.Session()
        http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
        session.mount('http://', http_adapter)

        test_urls = # list of 100 urls
        base_folder = 'download_temp'
        t1 = time()
        run(test_urls)
        print time() - t1  
Nizam Mohamed
  • 8,751
  • 24
  • 32
  • Thanks for your suggestion. I tried your code on my urls, but it takes >200s for the 1k urls. One issue might be that most of them are pointing to one domain but a lot of them also point to a different domains. – Framester Nov 17 '15 at 10:57
  • How much time do you think it should take? file size, client bandwidth and server load all play a role in the timings. – Nizam Mohamed Nov 17 '15 at 13:51
  • I've updated my answer to suggest using of `ThreadPool` for consumers. If the image processing is cpu-bound, you should use `multiprocessing.Pool`. – Nizam Mohamed Nov 17 '15 at 14:00
  • you can try setting `timeout` in `get` request to shorten the time. But some files may not be downloaded. – Nizam Mohamed Nov 17 '15 at 14:05
  • Thanks for your all suggestions. I just timed simple synchronous call and it took 350s for 1k images. I will try your threadpool code. – Framester Nov 17 '15 at 14:29
  • more than speed is integrity of downloaded files. code must anticipate error conditions like dns errors, connect timeout and http errors otherwise downloaded files will be useless. merely looking at the timings without checking file integrity is vein attempt. – Nizam Mohamed Nov 17 '15 at 14:34
  • That is a good point, but I use `request` to _handle_ connection issues and open the images afterwards in PIL which throws exceptions if it does not receive a valid image. – Framester Nov 17 '15 at 14:38
-1

I will suggest to pay attention to Grablib http://grablib.org/

It is an asynchronic parser based on pycurl and multicurl. Also it tryes to automatically solve network error (like try again if timeout, etc).

I believe the Grab:Spider module will solve your problems for 99%. http://docs.grablib.org/en/latest/index.html#spider-toc

  • Thanks. Can you elaborate what grablib does differently or why do you have an idea why it will work better than my approach? – Framester Nov 12 '15 at 09:45
  • Ooops, do you have direct urls of images? If yes, then sorry, you still can use Grab or whatever you have. The Grablib is ideal for crawling and parsing. However you can use it for image download too, the Grablib (specificaly the Grab:Spider module) retries the tasks where the network error was >400 and !=404. The number of retries can be set manually. It has logging and process monitoring. – Ashot Ogoltsov Nov 13 '15 at 10:07