2

I need to import a zipped csv into a mongo collection, but there is a catch - every record contains a timestamp in Pacific Time, which must be converted to the local time corresponding to the (longitude,latitude) pair found in the same record.

The code looks like so:

def read_csv_zip(path, timezones):
  with ZipFile(path) as z, z.open(z.namelist()[0]) as input:
    csv_rows = csv.reader(input)
    header = csv_rows.next()
    check,converters = get_aux_stuff(header)
    for csv_row in csv_rows:
      if check(csv_row):
        row = {
          converter[0]:converter[1](value) 
          for converter, value in zip(converters, csv_row) 
          if allow_field(converter)
        }
        ts = row['ts']
        lng, lat = row['loc']
        found_tz_entry = timezones.find_one(SON({'loc': {'$within': {'$box': [[lng-tz_lookup_radius, lat-tz_lookup_radius],[lng+tz_lookup_radius, lat+tz_lookup_radius]]}}}))
        if found_tz_entry:
          tz_name = found_tz_entry['tz']
          local_ts = ts.astimezone(timezone(tz_name)).replace(tzinfo=None)
          row['tz'] = tz_name
        else:
          local_ts = (ts.astimezone(utc) + timedelta(hours = int(lng/15))).replace(tzinfo = None)
        row['local_ts'] = local_ts
        yield row

def insert_documents(collection, source, batch_size):
  while True:
    items = list(itertools.islice(source, batch_size))
    if len(items) == 0:
      break;
    try:
      collection.insert(items)
    except:
      for item in items:
        try:
          collection.insert(item)
        except Exception as exc:
          print("Failed to insert record {0} - {1}".format(item['_id'], exc))

def main(zip_path):
  with Connection() as connection:
    data = connection.mydb.data
    timezones = connection.timezones.data
    insert_documents(data, read_csv_zip(zip_path, timezones), 1000)

The code proceeds as follows:

  1. Every record read from the csv is checked and converted to a dictionary, where some fields may be skipped, some titles be renamed (from those appearing in the csv header), some values may be converted (to datetime, to integers, to floats. etc ...)
  2. For each record read from the csv, a lookup is made into the timezones collection to map the record location to the respective time zone. If the mapping is successful - that timezone is used to convert the record timestamp (pacific time) to the respective local timestamp. If no mapping is found - a rough approximation is calculated.

The timezones collection is appropriately indexed, of course - calling explain() confirms it.

The process is slow. Naturally, having to query the timezones collection for every record kills the performance. I am looking for advises on how to improve it.

Thanks.

EDIT

The timezones collection contains 8176040 records, each containing four values:

> db.data.findOne()
{ "_id" : 3038814, "loc" : [ 1.48333, 42.5 ], "tz" : "Europe/Andorra" }

EDIT2

OK, I have compiled a release build of http://toblerity.github.com/rtree/ and configured the rtree package. Then I have created an rtree dat/idx pair of files corresponding to my timezones collection. So, instead of calling collection.find_one I call index.intersection. Surprisingly, not only there is no improvement, but it works even more slowly now! May be rtree could be fine tuned to load the entire dat/idx pair into RAM (704M), but I do not know how to do it. Until then, it is not an alternative.

In general, I think the solution should involve parallelization of the task.

EDIT3

Profile output when using collection.find_one:

>>> p.sort_stats('cumulative').print_stats(10)
Tue Apr 10 14:28:39 2012    ImportDataIntoMongo.profile

         64549590 function calls (64549180 primitive calls) in 1231.257 seconds

   Ordered by: cumulative time
   List reduced from 730 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.012    0.012 1231.257 1231.257 ImportDataIntoMongo.py:1(<module>)
        1    0.001    0.001 1230.959 1230.959 ImportDataIntoMongo.py:187(main)
        1  853.558  853.558  853.558  853.558 {raw_input}
        1    0.598    0.598  370.510  370.510 ImportDataIntoMongo.py:165(insert_documents)
   343407    9.965    0.000  359.034    0.001 ImportDataIntoMongo.py:137(read_csv_zip)
   343408    2.927    0.000  287.035    0.001 c:\python27\lib\site-packages\pymongo\collection.py:489(find_one)
   343408    1.842    0.000  274.803    0.001 c:\python27\lib\site-packages\pymongo\cursor.py:699(next)
   343408    2.542    0.000  271.212    0.001 c:\python27\lib\site-packages\pymongo\cursor.py:644(_refresh)
   343408    4.512    0.000  253.673    0.001 c:\python27\lib\site-packages\pymongo\cursor.py:605(__send_message)
   343408    0.971    0.000  242.078    0.001 c:\python27\lib\site-packages\pymongo\connection.py:871(_send_message_with_response)

Profile output when using index.intersection:

>>> p.sort_stats('cumulative').print_stats(10)
Wed Apr 11 16:21:31 2012    ImportDataIntoMongo.profile

         41542960 function calls (41542536 primitive calls) in 2889.164 seconds

   Ordered by: cumulative time
   List reduced from 778 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.028    0.028 2889.164 2889.164 ImportDataIntoMongo.py:1(<module>)
        1    0.017    0.017 2888.679 2888.679 ImportDataIntoMongo.py:202(main)
        1 2365.526 2365.526 2365.526 2365.526 {raw_input}
        1    0.766    0.766  502.817  502.817 ImportDataIntoMongo.py:180(insert_documents)
   343407    9.147    0.000  491.433    0.001 ImportDataIntoMongo.py:152(read_csv_zip)
   343406    0.571    0.000  391.394    0.001 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:384(intersection)
   343406  379.957    0.001  390.824    0.001 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:435(_intersection_obj)
   686513   22.616    0.000   38.705    0.000 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:451(_get_objects)
   343406    6.134    0.000   33.326    0.000 ImportDataIntoMongo.py:162(<dictcomp>)
      346    0.396    0.001   30.665    0.089 c:\python27\lib\site-packages\pymongo\collection.py:240(insert)

EDIT4

I have parallelized the code, but the results are still not very encouraging. I am convinced it could be done better. See my own answer to this question for details.

mark
  • 59,016
  • 79
  • 296
  • 580
  • If the timezones collection is not very big, I suggest read it into memory before processing the csv files. – fqsxr Apr 10 '12 at 12:51
  • I have edited my post. By reading it into memory you mean populating a dict instance keyed by the loc field? – mark Apr 10 '12 at 13:16
  • I will have to create some kind of a geo index data structure to support efficient lookup within a bounding box. Sounds like a piece of work. – mark Apr 10 '12 at 13:23
  • Try this: http://toblerity.github.com/rtree/ – fqsxr Apr 10 '12 at 13:48
  • I am running python 64 bits on Windows 7. I am really trying to make rtree work there, alas! I have compiled spatialindex from sources, renamed spatialindex64.dll to spatialindex_c.dll, placed it in the PATH - still easy_install rtree fails - AttributeError: function 'Error_GetLastErrorNum' not found. I am very near to giving up. – mark Apr 10 '12 at 16:05
  • @fqsxr - Hallelujah, I have finally managed to install rtree for my python. I am going to check your suggestion now. – mark Apr 10 '12 at 21:32
  • is most of your time spent in `read_csv_zip`? – lunixbochs Apr 11 '12 at 13:39
  • I have attached cProfile statistics. – mark Apr 11 '12 at 13:48
  • looks like it - seems like the time spent in insert_documents is skewed because read_csv_zip is a generator – lunixbochs Apr 11 '12 at 13:55
  • But the bulk of the time is spent either in `find_one` or `intersection`, where the latter could be made more efficient if only rtree read the entire dat/idx file pair into memory. As of now, it is lots of I/O. There must be a way to parallelize the task, I am just inexperienced with that sort of stuff. – mark Apr 11 '12 at 14:00

1 Answers1

0

OK, I have parallelized the code, but it runs only twice as faster, here is my solution:

write_batch_size=100
read_batch_size=100
count_parsed_csv_consumers=15
count_data_records_consumers=1
parsed_csv_queue = Queue()
data_record_queue = Queue()

def get_parsed_csv_consumer(converters, timezones):
  def do_work(csv_row):
    row = {
      converter[0]:converter[1](value) 
      for converter, value in zip(converters, csv_row) 
      if allow_field(converter)
    }
    ts = row['ts']
    lng, lat = row['loc']
    found_tz_entry = timezones.find_one(SON({'loc': {'$within': {'$box': [[lng-tz_lookup_radius, lat-tz_lookup_radius],[lng+tz_lookup_radius, lat+tz_lookup_radius]]}}}))
    if found_tz_entry:
      tz_name = found_tz_entry['tz']
      local_ts = ts.astimezone(timezone(tz_name)).replace(tzinfo=None)
      row['tz'] = tz_name
    else:
      local_ts = (ts.astimezone(utc) + timedelta(hours = int(lng/15))).replace(tzinfo = None)
    row['local_ts'] = local_ts
    return row
  def worker():
    while True:
      csv_rows = parsed_csv_queue.get();
      try:
        rows=[]
        for csv_row in csv_rows:
          rows.append(do_work(csv_row))
        data_record_queue.put_nowait(rows)
      except Exception as exc:
        print(exc)
      parsed_csv_queue.task_done()
  return worker

def get_data_record_consumer(collection):
  items = []
  def do_work(row):
    items.append(row)
    if len(items) == write_batch_size:
      persist_items()
  def persist_items():
    try:
      collection.insert(items)
    except:
      for item in items:
        try:
          collection.insert(item)
        except Exception as exc:
          print("Failed to insert record {0} - {1}".format(item['_id'], exc))
    del items[:]
  def data_record_consumer():
    collection    # explicit capture
    while True:
      rows = data_record_queue.get()
      try:
        if rows:
          for row in rows:
            do_work(row)
        elif items:
          persist_items()
      except Exception as exc:
        print(exc)
      data_record_queue.task_done()
  return data_record_consumer

def import_csv_zip_to_collection(path, timezones, collection):
  def get_threads(count, target, name):
    acc = []
    for i in range(count):
      x = Thread(target=target, name=name + " " + str(i))
      x.daemon = True
      x.start()
      acc.append(x)
    return acc

  with ZipFile(path) as z, z.open(z.namelist()[0]) as input:
    csv_rows = csv.reader(input)
    header = next(csv_rows)
    check,converters = get_aux_stuff(header)

    parsed_csv_consumer_threads = get_threads(count_parsed_csv_consumers, get_parsed_csv_consumer(converters, timezones), "parsed csv consumer")
    data_record_consumer_threads = get_threads(count_data_records_consumers, get_data_record_consumer(collection), "data record consumer")

    read_batch = []
    for csv_row in csv_rows:
      if check(csv_row):
        read_batch.append(csv_row)
        if len(read_batch) == read_batch_size:
          parsed_csv_queue.put_nowait(read_batch)
          read_batch = []
    if len(read_batch) > 0:
      parsed_csv_queue.put_nowait(read_batch)
      read_batch = []
    parsed_csv_queue.join()
    data_record_queue.join()
    # data record consumers may have some items cached. All of them must flush their caches now.
    # we do it by enqueing a special item, which when fetched causes the respective consumer to
    # terminate its operation
    for i in range(len(data_record_consumer_threads)):
      data_record_queue.put_nowait(None)
    data_record_queue.join()

The process goes like this:

  1. Parsed csv rows are batched (the size of the batch is determined by read_batch_size)
  2. When a batch of parsed csv rows is full it is placed in parsed_csv_queue to be consumed by the multiple consumers from parsed_csv_consumer_threads
  3. A parsed csv row consumer is slow, because it has to lookup the timezone using a mongo query (timezones.find_one) Hence there are many of them, count_parsed_csv_consumers to be exact.
  4. A parsed csv consumer converts its input to data records. The converted records are batched (the batch size is preserved, i.e. read_batch_size) and once the batch is full placed in another queue - data_record_queue
  5. Data record consumers fetch batches of data records from data_record_queue and insert them into the destination mongo collection.
  6. A data record consumer is much faster than a parsed csv record consumer, hence there are far fewer of them, in fact, I am using just one, but it can be changed through the count_data_records_consumers constant.

In the first version, I was placing individual records into the queues, but profiling has revealed that Queue.put_nowait is quite expensive, hence I was forced to reduce the number of puts by batching the records.

Anyway, the performance is twice as fast, but I was hoping for a much better result. Here are the profiling results:

>>> p.sort_stats('cumulative').print_stats(10)
Fri Apr 13 13:31:17 2012    ImportOoklaIntoMongo.profile

         3782711 function calls (3782429 primitive calls) in 310.209 seconds

   Ordered by: cumulative time
   List reduced from 737 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.016    0.016  310.209  310.209 .\ImportOoklaIntoMongo.py:1(<module>)
        1    0.004    0.004  309.833  309.833 .\ImportOoklaIntoMongo.py:272(main)
        1   17.829   17.829  220.432  220.432 .\ImportOoklaIntoMongo.py:225(import_csv_zip_to_collection)
   386081   28.049    0.000  135.297    0.000 c:\python27\lib\zipfile.py:508(readline)
   107008    7.588    0.000  102.938    0.001 c:\python27\lib\zipfile.py:570(read)
   107008   50.716    0.000   95.302    0.001 c:\python27\lib\zipfile.py:598(read1)
    71240    3.820    0.000   95.292    0.001 c:\python27\lib\zipfile.py:558(peek)
        1   89.382   89.382   89.382   89.382 {raw_input}
   386079   43.564    0.000   54.706    0.000 .\ImportOoklaIntoMongo.py:103(check)
    35767   40.286    0.001   40.286    0.001 {built-in method decompress}

I am a bit suspicious of the profiler output, because it seems to display just the main thread results. And indeed - How can I profile a multithread program in Python?

Community
  • 1
  • 1
mark
  • 59,016
  • 79
  • 296
  • 580