5

I'm trying to write a huge dataset ~146m rows to CSV. I've tried this:

def paramlist():
    for row in nodes.itertuples():
        l = []
        for row2 in ref_stops.itertuples():
            l.append((row[1], row[2], row[3], row2[1],
                     row2[2], row2[3], row2[4], haversine(row[3], row[2], row2[3], row2[2])))
        yield l

pool = multiprocessing.Pool()
pool.map(func, paramlist())

def func(params):
    with open(r'big_file.csv', 'a') as f:
        writer = csv.writer(f)
        for row in params:
            writer.writerow(row)

This code works, but it eats all my RAM and aborts.
How can I optimize it?

boardrider
  • 5,882
  • 7
  • 49
  • 86
Aleksandr Zakharov
  • 334
  • 1
  • 4
  • 16
  • This may have relevant information. Try applying buffering when opening the file object: http://stackoverflow.com/questions/22026393/python-fast-and-efficient-way-of-writing-large-text-file – lennyklb May 05 '17 at 12:36
  • 1
    This MVCE could not work. Missing `parama`,`nodes` and `pool.map(...` return to nothing. Please add missing code. – stovfl May 05 '17 at 15:41

2 Answers2

4

pool.map will consume the whole iterable before submitting parts of it to the pool's workers. That's why you get memory issues. You should use pool.imap instead in order to avoid this. See this post for a thorough explanation.

That being said, I sincerely doubt that multiprocessing will speed up your program in the way you wrote it since the bottleneck is disk I/O. Opening, appending, and closing the file over and over again is hardly quicker than one sequential write. Parallel writing to a single file is just not possible.

Assuming that the generation of l takes some time, there could be a speedup if you write your program like this:

from contextlib import closing
import multiprocessing
import csv
import pandas as pd
import numpy as np

# Just for testing
ref_stops = pd.DataFrame(np.arange(100).reshape((-1, 5)))
nodes = pd.DataFrame(np.arange(400).reshape((-1, 4)))
def haversine(a, b, c, d):
    return a*b*c*d

# This function will be executed by the workers
def join_rows(row):
    row_list = []
    # join row with all rows from `ref_stops` and compute haversine
    for row2 in ref_stops.itertuples():
        row_list.append((row[1], row[2], row[3],
                         row2[1], row2[2], row2[3], row2[4],
                         haversine(row[3], row[2], row2[3], row2[2])))
    return row_list


def main():
    with closing(multiprocessing.Pool()) as pool:
        # joined_rows will contain lists of joined rows in arbitrary order.
        # use name=None so we get proper tuples, pandas named tuples cannot be pickled, see https://github.com/pandas-dev/pandas/issues/11791
        joined_rows = pool.imap_unordered(join_rows, nodes.itertuples(name=None))

        # open file and write out all rows from incoming lists of rows
        with open(r'big_file.csv', 'w') as f:
            writer = csv.writer(f)
            for row_list in joined_rows:
                writer.writerows(row_list)

if __name__ == '__main__':
    main()

I assume you don't care about order, otherwise you wouldn't have chosen multiprocessing in the first place, right?
This way it's not the main process which generates the lists of rows but the worker processes. As soon as one worker process has finished a list, it will return it to the main process which then will append its entries to the file. The worker then fetches a new row and starts building another list.

It might also be better to use more pandas functionality in the program in general (I assume you're using pandas dataframes because of itertuples). For example you could create a new Dataframe instead of a list of rows and make haversine compatible to pandas.Series objects so you don't have to call it on every entry.

Community
  • 1
  • 1
swenzel
  • 6,745
  • 3
  • 23
  • 37
1

Try to write the data in terms of chunks. Read your data frame ( assuming you writing from a data frame) in parts i.e in terms of some chunks. Write each chunk once,this performs quite faster.

bigbounty
  • 16,526
  • 5
  • 37
  • 65