Per the
comments,
we wish to have each process work on a 10000-row chunk. That's not too hard to
to do; see the iter/islice
recipe below. However, the problem with using
pool.map(worker, ten_thousand_row_chunks)
is that pool.map
will attempt to put all the chunks in a task queue
at once. If this requires more memory than is available then you get a
MemoryError
. (Note: pool.imap
suffers from the same problem.)
So instead, we need to call pool.map
iteratively, on pieces of each chunk.
import itertools as IT
import multiprocessing as mp
import csv
def worker(chunk):
return len(chunk)
def main():
# num_procs is the number of workers in the pool
num_procs = mp.cpu_count()
# chunksize is the number of lines in a chunk
chunksize = 10**5
pool = mp.Pool(num_procs)
largefile = 'Counseling.csv'
results = []
with open(largefile, 'rb') as f:
reader = csv.reader(f)
for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
chunk = iter(chunk)
pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
result = pool.map(worker, pieces)
results.extend(result)
print(results)
pool.close()
pool.join()
main()
Each chunk
will consist of up to chunksize*num_procs
lines from the file.
This is enough data to give all workers in the pool something to work on, but not too big as to cause a MemoryError -- provided chunksize
is not set too large.
Each chunk
is then broken into pieces, with each piece consisting of up to
chunksize
rows from the file. These pieces are then sent to pool.map
.
How does iter(lambda: list(IT.islice(iterator, chunksize)), [])
work:
This is an idiom for grouping an iterator into chunks of length chunksize.
Let's see how it works on an example:
In [111]: iterator = iter(range(10))
Notice that each time IT.islice(iterator, 3)
is called, a new chunk of 3 items
is sliced off of the iterator:
In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]
In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]
In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]
When there are fewer than 3 items left in the iterator, only what remains is returned:
In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]
And if you call it again, you get an empty list:
In [116]: list(IT.islice(iterable, 3))
Out[116]: []
lambda: list(IT.islice(iterator, chunksize))
is a function which returns list(IT.islice(iterator, chunksize))
when called. It is a "one-liner" which is equivalent to
def func():
return list(IT.islice(iterator, chunksize))
Finally, iter(callable, sentinel)
returns another iterator. The values yielded by this iterator are the values returned by the callable. It keeps on yielding values until the callable returns a value equal to the sentinel. So
iter(lambda: list(IT.islice(iterator, chunksize)), [])
will keep on returning the values list(IT.islice(iterator, chunksize))
until that value is the empty list:
In [121]: iterator = iter(range(10))
In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]