3

Background
For our research we currently need to download ~ 15.000 files. While the database has it's command line tool, to support "bulk" download, this is totally unfeasible to do this for 15.000 runs sequentially ( as the command line tool currently does ).

Simple math
I used the currently available command line tool do download several runs, and took the average runtime, which is ~ 20 minutes per file ( if not more ). So to do so for all 15.000 files, this will take 15.000 * 20 / 60 / 24 = 208 days, which would be nice only if you get paid per an hour of script runtime ;)

The bulk command line script
This is the script which allows users to download bulk data ( NOT MADE BY ME ):

NOTE:
I changed it a little, so I can run it from the IDE directly ( so not having to start the command line for every little change )

'''
Created on 27/10/2015
@author: Maxim Scheremetjew
amended 07/11/2016 by Maxim Scheremetjew
version: 1.1
'''

import sys
import argparse
import csv
import os
import urllib.request, urllib.parse, urllib.error
import urllib.request, urllib.error, urllib.parse
from urllib.error import URLError
from io import StringIO


def _download_resource_by_url(url, output_file_name):
    """Kicks off a download and stores the file at the given path.
    Arguments:
    'url' -- Resource location.
    'output_file_name' -- Path of the output file.
    """
    print("Starting the download of the following file...")
    print(url)
    print("Saving file in:\n" + output_file_name)

    try:
        urllib.request.urlretrieve(url, output_file_name)
    except URLError as url_error:
        print(url_error)
        raise
    except  IOError as io_error:
        print(io_error)
        raise
    print("Download finished.")


def _get_number_of_chunks(url_template, study_id, sample_id, run_id, version, domain, file_type):
    """
    Returns the number of chunks for the given set of parameters (study, sample and run identifier).
    """
    print("Getting the number of chunks from the following URL...")
    url_get_number_of_chunks = url_template % (
        study_id, sample_id, run_id, version, domain, file_type)
    print(url_get_number_of_chunks)
    try:
        file_stream_handler = urllib.request.urlopen(url_get_number_of_chunks)
        result = int(file_stream_handler.read())
        print("Retrieved " + str(result) + " chunks.")
        return result
    except URLError as url_error:
        print(url_error)
        raise
    except IOError as io_error:
        print(io_error)
        raise
    except ValueError as e:
        print(e)
        print("Skipping this run! Could not retrieve the number of chunks for this URL. " \
              "Check the version number in the URL and check if the run is available online.")
        return 0


def _get_file_stream_handler(url_template, study_id):
    """
    Returns a file stream handler for the given URL.
    """
    print("Getting the list of project runs...")
    url_get_project_runs = url_template % (study_id)
    try:
        req = urllib.request.Request(url=url_get_project_runs, headers={'Content-Type': 'text/plain'})
        res = urllib.request.urlopen(req)
        dec_res =  res.read().decode()
        sys.stderr.write(str(dec_res))
        return dec_res
    except URLError as url_error:
        print(url_error)
        raise
    except  IOError as io_error:
        print(io_error)
        raise
    except ValueError as e:
        print(e)
        print("Could not retrieve any runs. Open the retrieval URL further down in your browser and see if you get any results back. Program will exit now.")
        print(url_get_project_runs)
        raise


def _print_program_settings(project_id, version, selected_file_types_list, output_path, root_url):
    print("Running the program with the following setting...")
    print("Project: " + project_id)
    print("Pipeline version: " + version)
    print("Selected file types: " + ",".join(selected_file_types_list))
    print("Root URL: " + root_url)
    print("Writing result to: " + output_path)


def start(args):


        function_file_type_list = ["InterProScan", "GOAnnotations", "GOSlimAnnotations"]
        sequences_file_type_list = ["ProcessedReads", "ReadsWithPredictedCDS", "ReadsWithMatches", "ReadsWithoutMatches",
                                    "PredictedCDS", "PredictedCDSWithoutAnnotation", "PredictedCDSWithAnnotation",
                                    "PredictedORFWithoutAnnotation", "ncRNA-tRNA-FASTA"]
        taxonomy_file_type_list = ["5S-rRNA-FASTA", "16S-rRNA-FASTA", "23S-rRNA-FASTA", "OTU-TSV", "OTU-BIOM",
                                   "OTU-table-HDF5-BIOM", "OTU-table-JSON-BIOM", "NewickTree", "NewickPrunedTree"]
        # Default list of available file types
        default_file_type_list = sequences_file_type_list + function_file_type_list + taxonomy_file_type_list

        # Parse script parameters

        # Parse the project accession
        study_id = args['project_id']

        # Parse the values for the file type parameter
        selected_file_types_list = []
        if not args['file_type']:
            # If not specified use the default set of file types
            selected_file_types_list = default_file_type_list
        else:
            # Remove whitespaces
            selected_file_types_str = args['file_type'].replace(" ", "")
            # Set all functional result file types
            if selected_file_types_str == "AllFunction":
                selected_file_types_list = function_file_type_list
            elif selected_file_types_str == "AllTaxonomy":
                selected_file_types_list = taxonomy_file_type_list
            elif selected_file_types_str == "AllSequences":
                selected_file_types_list = sequences_file_type_list
            # Set defined file types
            elif len(selected_file_types_str.split(",")) > 1:
                selected_file_types_list = selected_file_types_str.split(",")
            # Set single file type
            else:
                selected_file_types_list.append(selected_file_types_str)

        # Parse the analysis version
        version = args['version']

        root_url = "https://www.ebi.ac.uk"
        study_url_template = root_url + "/metagenomics/projects/%s/runs"
        number_of_chunks_url_template = root_url + "/metagenomics/projects/%s/samples/%s/runs/%s/results/versions/%s/%s/%s/chunks"
        chunk_url_template = root_url + "/metagenomics/projects/%s/samples/%s/runs/%s/results/versions/%s/%s/%s/chunks/%s"
        download_url_template = root_url + "/metagenomics/projects/%s/samples/%s/runs/%s/results/versions/%s/%s/%s"

        # Print out the program settings
        _print_program_settings(study_id, version, selected_file_types_list, args['output_path'], root_url)

        # Iterating over all file types
        for file_type in selected_file_types_list:
            domain = None
            fileExtension = None
            # Boolean flag to indicate if a file type is chunked or not
            is_chunked = True
            # Set the result file domain (sequences, function or taxonomy) dependent on the file type
            # Set output file extension (tsv, faa or fasta) dependent on the file type
            if file_type == 'InterProScan':
                domain = "function"
                fileExtension = ".tsv.gz"
            elif file_type == 'GOSlimAnnotations' or file_type == 'GOAnnotations':
                domain = "function"
                fileExtension = ".csv"
                is_chunked = False
            # PredictedCDS is version 1.0 and 2.0 only, from version 3.0 on this file type was replaced by
            # PredictedCDSWithAnnotation (PredictedCDS can be gained by concatenation of the 2 sequence file types now)
            elif file_type == 'PredictedCDS' or file_type == 'PredicatedCDSWithoutAnnotation' or file_type == \
                    'PredictedCDSWithAnnotation':
                if file_type == 'PredictedCDSWithAnnotation' and (version == '1.0' or version == '2.0'):
                    print("File type '" + file_type + "' is not available for version " + version + "!")
                    continue
                elif file_type == 'PredictedCDS' and version == '3.0':
                    print("File type '" + file_type + "' is not available for version " + version + "!")
                    continue
                domain = "sequences"
                fileExtension = ".faa.gz"
            elif file_type == 'ncRNA-tRNA-FASTA':
                domain = "sequences"
                fileExtension = ".fasta"
                is_chunked = False
            elif file_type == '5S-rRNA-FASTA' or file_type == '16S-rRNA-FASTA' or file_type == '23S-rRNA-FASTA':
                is_chunked = False
                domain = "taxonomy"
                fileExtension = ".fasta"
            # NewickPrunedTree is version 2 only
            # NewickTree is version 1 only
            elif file_type == 'NewickPrunedTree' or file_type == 'NewickTree':
                if file_type == 'NewickPrunedTree' and version == '1.0':
                    print("File type '" + file_type + "' is not available for version " + version + "!")
                    continue
                if file_type == 'NewickTree' and version == '2.0':
                    print("File type '" + file_type + "' is not available for version " + version + "!")
                    continue
                is_chunked = False
                domain = "taxonomy"
                fileExtension = ".tree"
            elif file_type == 'OTU-TSV':
                is_chunked = False
                domain = "taxonomy"
                fileExtension = ".tsv"
            # OTU-BIOM is version 1 only
            # OTU-table-HDF5-BIOM and OTU-table-JSON-BIOM are version 2 only
            elif file_type == 'OTU-BIOM' or file_type == 'OTU-table-HDF5-BIOM' or file_type == 'OTU-table-JSON-BIOM':
                if file_type == 'OTU-BIOM' and version == '2.0':
                    print("File type '" + file_type + "' is not available for version " + version + "!")
                    continue
                if (file_type == 'OTU-table-HDF5-BIOM' or file_type == 'OTU-table-JSON-BIOM') and version == '1.0':
                    print("File type '" + file_type + "' is not available for version " + version + "!")
                    continue
                is_chunked = False
                domain = "taxonomy"
                fileExtension = ".biom"
            else:
                domain = "sequences"
                fileExtension = ".fasta.gz"

            # Retrieve a file stream handler from the given URL and iterate over each line (each run) and build the download link using the variables from above
            file_stream_handler = _get_file_stream_handler(study_url_template, study_id)
            reader = csv.reader(StringIO(file_stream_handler), delimiter=',')

            for study_id, sample_id, run_id in reader:
                print(study_id + ", " + sample_id + ", " + run_id)

                output_path = args['output_path'] + "/" + study_id + "/" + file_type
                if not os.path.exists(output_path):
                    os.makedirs(output_path)

                if is_chunked:
                    number_of_chunks = _get_number_of_chunks(number_of_chunks_url_template, study_id, sample_id, run_id,
                                                             version, domain, file_type)

                    for chunk in range(1, number_of_chunks + 1):
                        output_file_name = output_path + "/" + run_id.replace(" ", "").replace(",",
                                                                                               "-") + "_" + file_type + "_" + str(
                                chunk) + fileExtension
                        rootUrl = chunk_url_template % (study_id, sample_id, run_id, version, domain, file_type, chunk)
                        _download_resource_by_url(rootUrl, output_file_name)
                else:
                    output_file_name = output_path + "/" + run_id.replace(" ", "").replace(",",
                                                                                           "-") + "_" + file_type + fileExtension
                    rootUrl = download_url_template % (study_id, sample_id, run_id, version, domain, file_type)
                    _download_resource_by_url(rootUrl, output_file_name)

        print("Program finished.")

start({'project_id':'ERP001736',
       'file_type': 'ProcessedReads,16S-rRNA-FASTA,OTU-TSV',
       'version': '2.0',
       'output_path':''})

What am I thinking of
I have ( a little ) experience with multithreading / multiprocessing / asynchronous requests, but can't figure out what I should do in this case. I have 20 CPU's on my Linux server, so I could do some MP ( ~ 208 / 20 = 10+ days ), but based on my previous experience with doing this, the CPU's will only be used for ~1-5%, which seems a waste of capacity. I haven't used the other two methods for this kind of problem, I used both for simple http-requests ( just asking for a page and getting the result, not downloading files in chunks ).


The real question

What would be the fastest method to download all these 15.000 files ( sequentially is definitely not an option )?

If it's not too time consuming, please, provide a code example ( or a reference ) of what you mean.


Updates
I used nload to measure the dataflow-bandwidth usage, while running the script, downloading 1 file ( of course there were background processes, but these seem to be negligible, only several Mb's ). I did this at 4 time points and averaged the numbers:

Curr: 110    Mbit/s
Min:   30    Mbit/s
Avg:   90    Mbit/s
Max:  159.75 Mbit/s
Ttl:  752.41 GByte

user3666197
  • 1
  • 6
  • 50
  • 92
CodeNoob
  • 1,988
  • 1
  • 11
  • 33
  • How strict is the requirement to use Python? You could probably get pretty far with https://aria2.github.io/manual/en/html/aria2c.html#id2 or even wget/xargs -P. – Richard Nienaber Oct 19 '17 at 14:03
  • Possible duplicates: [Q1](https://stackoverflow.com/questions/3530955/retrieve-multiple-urls-at-once-in-parallel) and [Q2](https://stackoverflow.com/questions/3490173/how-can-i-speed-up-fetching-pages-with-urllib2-in-python) – Mahdi Oct 19 '17 at 14:05
  • 1
    I whish this was a duplicate, but downloading zipped files in chunks is definitely different from just requesting pages (as I already explained in my question) @Mahdi – CodeNoob Oct 19 '17 at 14:07
  • 3
    since you are just downloading data from some server, you will be I/O bound, not CPU bound. Since 1 file takes 20 minutes, these must be rather large files. When you do one of these transfers, does it max out the network bandwidth? If it does, adding more threads won't help too much sadly. – Matthew Curry Oct 19 '17 at 14:07
  • Aaah right, how can I check if it maxes out my bandwith? @MatthewCurry, never mind I already found it – CodeNoob Oct 19 '17 at 14:08
  • Activity monitor on a Mac, task manager on windows, linux I forget, but Gnome and KDE will have something I'm sure. – Matthew Curry Oct 19 '17 at 14:12
  • Finally I have the numbers (I don't have root rights, so had to do some bypassing) I hope you can interpret these, see updated question @MatthewCurry – CodeNoob Oct 19 '17 at 15:38
  • There is not much to be said about these numbers without knowing what uplink you have. My company has a 100MBit uplink, so if we assume your 90MBit average are really from just downloading that one file, you won't win much parallelising because your uplink is already saturated. – deets Oct 19 '17 at 15:50
  • Is there are linux program/command which shows al the info at once (so bandwith and the link you are talking about)? @deets – CodeNoob Oct 19 '17 at 16:27
  • I'm talking about the uplink. It's your internet connection. Linux doesn't know anything about this. It depends on your company/university/whatevers contract what bandwidth you have. Plus potential rate limiting so that one eager employees download script doesn't spoil the fun for the rest of the workforce ;) Your sysadmin should know this. And probably also has an opinion on how much bandwidth you should use. – deets Oct 19 '17 at 16:58
  • Well I will ask that (The response will probably take some time). Any idea how I can just use the max bandwidth possible? @deets – CodeNoob Oct 19 '17 at 17:03
  • 2
    just spawn threads or processes and distribute the actual CSV-files between them. There are tons of answers here on SO about multiprocessing and how to have several worker processes. Or threads. – deets Oct 19 '17 at 17:17
  • agree with @deets here, just run more than one of these python scripts you already have, each asking for a different piece of the data. Processes work just as well as threads, especially when I/O bound. – Matthew Curry Oct 19 '17 at 19:42

0 Answers0