So I'm trying to process huge data files (> 1.6GB) using the requests
library to deal with chunks of data.
import urllib2, json, csv
import requests
import multiprocessing
def getTaxiTrips(date):
"""
Gets the taxi trips occurred in NY from a starting date.
:param date: (Y-m-d).
:return: list of tuples (long, lat, drop off date).
"""
today = str(datetime.date(datetime.now())).split('-')
today_y = today[0]
today_m = today[1]
start = date.split('-')
start_y = start[0]
start_m = start[1]
print start_m+"-"+start_y +" / "+today_m+"-"+today_y
data = []
y = int(start_y)
m = int(start_m)
while int(start_y) <= int(today_y):
# Month transformation
if m > 12:
m %= 12
y += 1
mt = str(m) if m > 9 else '0' + str(m)
# Green cabs
if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) + \
"/green_tripdata_" + str(y) + "-" + mt + ".csv") is not None:
data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) + \
"/green_tripdata_" + str(y) + "-" + mt + ".csv")
if m == int(today_m):
break
m += 1
pool = multiprocessing.Pool(mps-1)
result = pool.map(consumeTaxiData, data)
pool.close()
pool.join()
return list(itertools.chain(*result))
def consumeTaxiData(url):
"""
Given a url, reads its content and process its data.
:param url: the url to be readen.
:return: a list of tuples in the form (long, lat, hour).
"""
print "Processing", url
points = []
r = requests.get(url, stream=True)
for chunk in r.iter_content(chunk_size=1024):
if chunk:
reader = csv.DictReader(chunk.splitlines(), delimiter=',')
for line in reader:
print line
latitude = line.get('dropoff_latitude', None)
if latitude is None:
latitude = line.get('Dropoff_latitude', None)
longitude = line.get('dropoff_longitude', None)
if longitude is None:
longitude = line.get('Dropoff_longitude', None)
time = line.get('tpep_dropoff_datetime', None)
if time is None:
time = line.get('Lpep_dropoff_datetime', None)
if time is not None and latitude is not None and longitude is not None and \
datetime.strptime(time, '%Y-%m-%d %H:%M:%S') >= datetime.strptime(date, '%Y-%m-%d'):
time = roundTime(datetime.strptime(time, '%Y-%m-%d %H:%M:%S'), roundTo=60 * 60).hour
points.append((longitude, latitude, time))
return points
This is one file example:
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv
I'm not sure how to use this idea properly because the first line of the file is a header that specifies the key of some content that I need to capture. This may change among files, so I though about reading it using csv.DictRead
. But I don't know if this works among chunks, because the header would be captured just in the first chunk, right? Is there a way to preserver the keys and be able to use csv.DictReader
?