11

I'm using the following code to split a CSV file into multiple chunks (sourced from here)

def worker(chunk):
    print len(chunk)

def keyfunc(row):
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'Counseling.csv'
    num_chunks = 10
    start_time = time.time()
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        reader.next()
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()

However, it seems that the number of chunks always remains constant regardless of the number of chunks that I choose to use. For example, whether I choose to have 1 or 10 chunks, I always get this output when processing a sample file. Ideally, I'd like to chunk a file so that it is equitably distributed.

Note, the real file I am chunking is over 13 million rows long which is why I am processing it piece by piece. That is a must!

6
7
1
...
1
1
94
--- 0.101687192917 seconds ---
Community
  • 1
  • 1
  • 1
    Let's say you choose to split the file into 10 chunks. Do you want one worker process to handle 1 chunk of the file, or do you want to evenly spread that 1 chunk amongst the workers in the pool, wait until they all finish, and then send the pool the next chunk? – unutbu Jul 01 '15 at 18:24
  • 1
    @HappyLeapSecond 1 chunk per a worker process would be more efficient (so I don't have to block and wait for every other process to finish as well) Before asking this question, I looked through the Python documentation pretty extensively. My understanding is that you are using groupby to map each value in a row to a key (the corresponding column). This returns an iterator. Then you are passing that to islice which starts at 0 and then takes out num_chunks (which would be 10). This would be the number of rows correct? Ideally, I'd like to have processes work with 10,000 row chunks. –  Jul 01 '15 at 19:30
  • 1
    In the other problem, *"there is a column which needs to be [grouped] by ... and all rows with that name can't be split up"*. That is the reason why `itertools.groupby` was used. Here, there is no requirement to group rows by the value of a certain column, so we can skip using `itertools.groupby`. – unutbu Jul 01 '15 at 20:19

2 Answers2

15

Per the comments, we wish to have each process work on a 10000-row chunk. That's not too hard to to do; see the iter/islice recipe below. However, the problem with using

pool.map(worker, ten_thousand_row_chunks)

is that pool.map will attempt to put all the chunks in a task queue at once. If this requires more memory than is available then you get a MemoryError. (Note: pool.imap suffers from the same problem.)

So instead, we need to call pool.map iteratively, on pieces of each chunk.

import itertools as IT
import multiprocessing as mp
import csv

def worker(chunk):
    return len(chunk)

def main():
    # num_procs is the number of workers in the pool
    num_procs = mp.cpu_count()
    # chunksize is the number of lines in a chunk
    chunksize = 10**5

    pool = mp.Pool(num_procs)
    largefile = 'Counseling.csv'
    results = []
    with open(largefile, 'rb') as f:
        reader = csv.reader(f)
        for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
            chunk = iter(chunk)
            pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
            result = pool.map(worker, pieces)
            results.extend(result)
    print(results)
    pool.close()
    pool.join()

main()

Each chunk will consist of up to chunksize*num_procs lines from the file. This is enough data to give all workers in the pool something to work on, but not too big as to cause a MemoryError -- provided chunksize is not set too large.

Each chunk is then broken into pieces, with each piece consisting of up to chunksize rows from the file. These pieces are then sent to pool.map.


How does iter(lambda: list(IT.islice(iterator, chunksize)), []) work:

This is an idiom for grouping an iterator into chunks of length chunksize. Let's see how it works on an example:

In [111]: iterator = iter(range(10))

Notice that each time IT.islice(iterator, 3) is called, a new chunk of 3 items is sliced off of the iterator:

In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]

In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]

In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]

When there are fewer than 3 items left in the iterator, only what remains is returned:

In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]

And if you call it again, you get an empty list:

In [116]: list(IT.islice(iterable, 3))
Out[116]: []

lambda: list(IT.islice(iterator, chunksize)) is a function which returns list(IT.islice(iterator, chunksize)) when called. It is a "one-liner" which is equivalent to

def func():
    return  list(IT.islice(iterator, chunksize))

Finally, iter(callable, sentinel) returns another iterator. The values yielded by this iterator are the values returned by the callable. It keeps on yielding values until the callable returns a value equal to the sentinel. So

iter(lambda: list(IT.islice(iterator, chunksize)), [])

will keep on returning the values list(IT.islice(iterator, chunksize)) until that value is the empty list:

In [121]: iterator = iter(range(10))

In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Community
  • 1
  • 1
unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • Wow! Great and descriptive answer. Thank you so much. I understand it so much better now. If I can ask you a question, how did you get so good at these things and intuitively understand these pythonic principles? Do you have a book or resource you can recommend? –  Jul 02 '15 at 02:02
  • There are plenty of others who know much more than me, so I identify more with you, the one asking the question, than being the one trying to answer it. Moreover, there may not be [a royal road](https://en.wikipedia.org/wiki/Royal_Road#A_metaphorical_.E2.80.9CRoyal_Road.E2.80.9D_in_famous_quotations). One thing, perhaps has really helped me though -- I collect short, simple examples demonstrating the use of each feature and function in Python. – unutbu Jul 02 '15 at 09:50
  • I don't think it matters greatly what documentation you read. There are a lot of great free docs and tutorials on the net. What matters is that you practice and play with the language. Concrete examples make the meaning and behavior of the language clear. So the best advice I can give is to enjoy programming and engage in [a lot of practice/play](http://norvig.com/21-days.html). – unutbu Jul 06 '15 at 19:04
  • what if I have a function say `func1`, which take only one row of a specific column say `var1` in the `Counseling.csv` file as input, and this function will produce a list which will be written to a new `csv` file named 'output.csv'? – Jia Gao Sep 05 '18 at 03:13
6

First of all itertools.groupby will not make any real sense if the records are not already sorted on the key column. Moreover, if you requirement is just to chunk the csv file into a predetermined number of rows and give it to a worker , then you don’t have to do all these.

A simple implementation will be:

import csv
from multiprocessing import Pool


def worker(chunk):
    print len(chunk)

def emit_chunks(chunk_size, file_path):
    lines_count = 0
    with open(file_path) as f:
        reader = csv.reader(f)
        chunk = []
        for line in reader:
            lines_count += 1
            chunk.append(line)
            if lines_count == chunk_size:
                lines_count = 0
                yield chunk
                chunk = []
            else:
                continue
        if chunk : yield chunk

def main():
    chunk_size = 10
    gen = emit_chunks(chunk_size, 'c:/Temp/in.csv')
    p = Pool(5)
    p.imap(worker, gen)
    print 'Completed..'

*Edit: changed to pool.imap instead of pool.map

gipsy
  • 3,859
  • 1
  • 13
  • 21
  • 1
    Would not `pool.imap` be better memory wise and if that column is sorted the `if lines_count == chunk_size` be tweaked to ensure require the specific column to have different values – deinonychusaur Jul 01 '15 at 22:17
  • @deinonychusaur Absolutely, pool.imap is the correct way to do it, otherwice we will run into memory issue. I am changing my answer to use that. Thanks. – gipsy Jul 02 '15 at 00:59
  • I get it. You are not storing them in memory but using yield to produce these values from a generator correct? I selected the other answer because the yield keyword is a little complicated and it took me a bit to understand what you were doing. Never the less, I upvoted your answer and I really appreciate your help. Keep doing what you do man :-) ! –  Jul 02 '15 at 02:05