0

I've written a Python script that will download files from a website. To speed it up, I've made the downloading of the files multithreaded. Obviously, this is faster than doing the downloads serially, but I've come across some effects that I cannot explain.

  1. The first x files (seems proportional to the amount of threads created) downloaded are incredibly fast--the output shows upwards of 40 files per second--but after that, slows down a lot.
  2. Up to a point (near 200 threads), the maximum speed at which I can download files averages 10 files per second. If I increase the thread count to, say, 700, it still maxes out at 10 files per second. Increasing the thread count to a very large number (over 1,000) seems to limit the download speed based on CPU speed.

So, my questions are:

  1. Why are the first files I download downloaded so fast compared to the rest and can I maintain the original speed?
  2. Why does the thread count have such diminishing returns for download speeds?

Here is my script:

#!/usr/bin/python

import inspect
import math
from queue import Queue
from urllib.request import ProxyHandler, build_opener
from ast import literal_eval
from time import time, sleep
from datetime import timedelta
import random
from threading import Thread, activeCount
import os

proxies = Queue()
threads = Queue()
agents = []
total_files = 0
finished_files = 0
downloaded_files = 0
start_time = 0

class Config(object):
    DEBUG = False
    PROXIES_PATH = '/home/shane/bin/proxies.txt'
    AGENTS_PATH = '/home/shane/bin/user-agents.txt'
    DESTINATION_PATH = '/home/shane/images/%d.jpg'
    SOURCE_URL = 'https://example.org/%d.jpg'
    MAX_THREADS = 500
    TIMEOUT = 62
    RETRIES = 1
    RETRIES_TIME = 1

def get_files_per_second():
    return float(downloaded_files) / (time() - start_time)

def get_time_remaining():
    delta = timedelta(seconds=float(total_files - finished_files) / get_files_per_second())
    seconds = delta.total_seconds()
    days, remainder = divmod(seconds, 86400)
    hours, remainder = divmod(remainder, 3600)
    minutes, seconds = divmod(remainder, 60)
    days = str(int(days)).zfill(2)
    hours = str(int(hours)).zfill(2)
    minutes = str(int(minutes)).zfill(2)
    seconds = str(int(seconds)).zfill(2)
    return "%s:%s:%s:%s" % (days, hours, minutes, seconds)

def release_proxy(opener):
    if Config.DEBUG:
        print('Releasing proxy')
    for handler in opener.handlers:
        if type(handler) is ProxyHandler:
            proxies.put(handler)
            return
    raise Exception('No proxy found')

def get_new_proxy():
    if Config.DEBUG:
        print('Getting new proxy')
    if proxies.empty():
        raise Exception('No proxies')
    return proxies.get()

def get_new_agent():
    if len(agents) == 0:
        raise Exception('No user agents')
    return random.choice(agents)

def get_new_opener():
    opener = build_opener(get_new_proxy())
    opener.addheaders = [('User-Agent', get_new_agent())]
    return opener

def download(opener, source, destination, tries=0):
    global finished_files, downloaded_files
    if Config.DEBUG:
        print('Downloading %s to %s' % (source, destination))
    try:
        result = opener.open(source, timeout=Config.TIMEOUT).read()
        with open(destination, 'wb') as d:
            d.write(result)
        release_proxy(opener)
        finished_files += 1
        downloaded_files += 1
        to_print = '(%d/%d files) (%d proxies) (%f files/second, %s left) (%d threads) %s'
        print(to_print % (finished_files, total_files, proxies.qsize(), round(get_files_per_second(), 2), get_time_remaining(), activeCount(), source))
    except Exception as e:
        if Config.DEBUG:
            print(e)
        if tries < Config.RETRIES:
            sleep(Config.RETRIES_TIME)
            download(opener, source, destination, tries + 1)
        else:
            if proxies.qsize() < Config.MAX_THREADS * 2:
                release_proxy(opener)
            download(get_new_opener(), source, destination, 0)

class Downloader(Thread):
    def __init__(self, source, destination):
        Thread.__init__(self)
        self.source = source
        self.destination = destination
    def run(self):
        if Config.DEBUG:
            print('Running thread')
        download(get_new_opener(), self.source, self.destination)
        if threads.qsize() > 0:
            threads.get().start()

def populate_proxies():
    if Config.DEBUG:
        print('Populating proxies')
    with open(Config.PROXIES_PATH, 'r') as fh:
        for line in fh:
            line = line.replace('\n', '')
            if Config.DEBUG:
                print('Adding %s to proxies' % line)
            proxies.put(ProxyHandler(literal_eval(line)))

def populate_agents():
    if Config.DEBUG:
        print('Populating agents')
    with open(Config.AGENTS_PATH, 'r') as fh:
        for line in fh:
            line = line.replace('\n', '')
            if Config.DEBUG:
                print('Adding %s to agents' % line)
            agents.append(line)

def populate_threads():
    global total_files, finished_files
    if Config.DEBUG:
        print('Populating threads')
    for x in range(0, 100000):
        destination = Config.SOURCE_URL % x
        # queue threads
        print('Queueing %s' % destination)
        threads.put(Downloader(source, destination))

def start_work():
    global start_time
    if threads.qsize() == 0:
        raise Exception('No work to be done')
    start_time = time()
    for x in range(0, min(threads.qsize(), Config.MAX_THREADS)):
        if Config.DEBUG:
            print('Starting thread %d' % x)
        threads.get().start()

populate_proxies()
populate_agents()
populate_threads()
start_work()
Shane
  • 1,190
  • 15
  • 28

1 Answers1

1

The no. of threads you are using is a very high number, python does not actually run the threads in parallel, it just switches between them frequently, which seems like parallel threads. If the task is CPU intensive, then use multi-processing, else if the task is I/O intensive, threads will be useful. Keep the thread count low (10-70), on a normal Quad-core PC, 8GB ram, else the switching time will reduce the speed of your code. Check these 2 links:

  1. Stack Over Flow Question
  2. Executive Summary On this page.
Nitin
  • 246
  • 1
  • 7
  • I do find speed benefits from going above ~100 threads. I think it has to do with how long some of the requests take to get a result (upwards of 60 seconds sometimes) – Shane Jul 11 '19 at 08:37
  • 1
    Yes, that can be the case. The speed reduction afterward maybe due to combining the packets into a single file(that could be a CPU intensive process). – Nitin Jul 11 '19 at 08:57
  • Actually, you're right. I could have sworn earlier testing showed a speed increase when I put the threads very high, but now just 50 threads are maxing out my download rate at 10 files per second. Checking `top` only shows about a 30% CPU utilization even with a few hundred threads. Any idea on my first question? The first files I download when first running the script are very fast but then the rate slows. – Shane Jul 11 '19 at 09:00
  • 1
    Try using [multi processing](https://medium.com/practo-engineering/threading-vs-multiprocessing-in-python-7b57f224eadb) – Nitin Jul 11 '19 at 11:54