1

So I'm trying to process huge data files (> 1.6GB) using the requests library to deal with chunks of data.

import urllib2, json, csv
import requests
import multiprocessing

def getTaxiTrips(date):
    """
    Gets the taxi trips occurred in NY from a starting date.
    :param date: (Y-m-d).
    :return: list of tuples (long, lat, drop off date).
    """
    today = str(datetime.date(datetime.now())).split('-')
    today_y = today[0]
    today_m = today[1]

    start = date.split('-')
    start_y = start[0]
    start_m = start[1]

    print start_m+"-"+start_y +" / "+today_m+"-"+today_y

    data = []
    y = int(start_y)
    m = int(start_m)
    while int(start_y) <= int(today_y):
        # Month transformation
        if m > 12:
            m %= 12
            y += 1

        mt = str(m) if m > 9 else '0' + str(m)
        # Green cabs
        if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) + \
                   "/green_tripdata_" + str(y) + "-" + mt + ".csv") is not None:
            data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) + \
                        "/green_tripdata_" + str(y) + "-" + mt + ".csv")

        if m == int(today_m):
            break
        m += 1

    pool = multiprocessing.Pool(mps-1)
    result = pool.map(consumeTaxiData, data)
    pool.close()
    pool.join()

    return list(itertools.chain(*result))


def consumeTaxiData(url):
    """
    Given a url, reads its content and process its data.
    :param url: the url to be readen.
    :return: a list of tuples in the form (long, lat, hour).
    """
    print "Processing", url
    points = []

    r = requests.get(url, stream=True)
    for chunk in r.iter_content(chunk_size=1024):
        if chunk:
            reader = csv.DictReader(chunk.splitlines(), delimiter=',')
            for line in reader:
                print line
                latitude = line.get('dropoff_latitude', None)
                if latitude is None:
                    latitude = line.get('Dropoff_latitude', None)

                longitude = line.get('dropoff_longitude', None)
                if longitude is None:
                    longitude = line.get('Dropoff_longitude', None)

                time = line.get('tpep_dropoff_datetime', None)
                if time is None:
                    time = line.get('Lpep_dropoff_datetime', None)
                if time is not None and latitude is not None and longitude is not None and \
                                datetime.strptime(time, '%Y-%m-%d %H:%M:%S') >= datetime.strptime(date, '%Y-%m-%d'):
                    time = roundTime(datetime.strptime(time, '%Y-%m-%d %H:%M:%S'), roundTo=60 * 60).hour
                    points.append((longitude, latitude, time))

    return points

This is one file example:

https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv

I'm not sure how to use this idea properly because the first line of the file is a header that specifies the key of some content that I need to capture. This may change among files, so I though about reading it using csv.DictRead. But I don't know if this works among chunks, because the header would be captured just in the first chunk, right? Is there a way to preserver the keys and be able to use csv.DictReader?

Community
  • 1
  • 1
pceccon
  • 9,379
  • 26
  • 82
  • 158

0 Answers0