3

I had a strange problem. I have a file of the format:

START
1
2
STOP
lllllllll
START
3
5
6
STOP

and I want to read the lines between START and STOP as blocks, and use my_f to process each block.

def block_generator(file):

with open(file) as lines:
    for line in lines:
        if line == 'START': 
            block=itertools.takewhile(lambda x:x!='STOP',lines) 
            yield block   

and in my main function I tried to use map() to get the work done. It worked.

blocks=block_generator(file)
map(my_f,blocks)

will actually give me what I want. But when I tried the same thing with multiprocessing.Pool.map(), it gave me an error said takewhile() wanted to take 2 arguments, was given 0.

    blocks=block_generator(file)
    p=multiprocessing.Pool(4) 
    p.map(my_f,blocks)

Is this a bug?

  1. The file have more than 1000000 blocks, each has less than 100 lines.
  2. I accept the answer form untubu.
  3. But maybe I will simple split the file and use n instance of my original script without multiprocessing to processing them then cat the results together. This way you can never be wrong as long as the script works on a small file.
gstar2002
  • 495
  • 9
  • 20
  • 1
    What happens if you use `block=list(itertools.takewhile(lambda x: x != 'STOP', lines))` instead, so you don't have multiple iterators running at once? – agf Aug 13 '11 at 01:30
  • Hi @agf thanks, it now gives the right results. But it is very slow. As if p.map want to read the whole file in memory first. Or something like that. But the question is still there, why? – gstar2002 Aug 13 '11 at 01:51

2 Answers2

2

How about:

import itertools

def grouper(n, iterable, fillvalue=None):
    # Source: http://docs.python.org/library/itertools.html#recipes
    "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
    return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue)

def block_generator(file):
    with open(file) as lines:
        for line in lines:
            if line == 'START': 
                block=list(itertools.takewhile(lambda x:x!='STOP',lines))
                yield block

blocks=block_generator(file)
p=multiprocessing.Pool(4)
for chunk in grouper(100,blocks,fillvalue=''):
    p.map(my_f,chunk)

Using grouper will limit the amount of the file consumed by p.map. Thus the whole file need not be read into memory (fed into the task queue) at once.


I claim above that when you call p.map(func,iterator), the entire iterator is consumed immediatedly to fill a task queue. The pool workers then get tasks from the queue and work on the jobs concurrently.

If you look inside pool.py and trace through the definitions, you will see the _handle_tasks thread gets items from self._taskqueue, and enumerates that at once:

         for i, task in enumerate(taskseq):
             ...
             put(task)

The conclusion is, the iterator passed to p.map gets consumed at once. There is no waiting for the one task to end before the next task is gotten from the queue.

As further corroboration, if you run this:

demonstration code:

import multiprocessing as mp
import time
import logging

def foo(x):
    time.sleep(1)
    return x*x

def blocks():
    for x in range(1000):
        if x%100==0:
            logger.info('Got here')
        yield x

logger=mp.log_to_stderr(logging.DEBUG)
logger.setLevel(logging.DEBUG) 
pool=mp.Pool() 
print pool.map(foo, blocks()) 

You will see the Got here message printed 10 times almost immediately, and then a long pause due to the time.sleep(1) call in foo. This manifestly shows the iterator is fully consumed long before the pool processes gets around to finishing the tasks.

unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • This doesn't make any sense. This way, you read 100 blocks into memory at once, then consume them in parallel. Without grouper, you only read one block into memory per proccess -- so 4 blocks at a time. His problem was the time each `list` call was taking to consume the `takewhile` -- you haven't done anything to speed that up. The solution is not to use `list`. – agf Aug 13 '11 at 02:49
  • Maybe you're right, it looks like `Pool.map_async` does `list()` the iterable. However, this still assumes there are many relatively short blocks in the file. If there are less than 100 (relatively long) blocks in the file, this doesn't improve things at all, and it doesn't improve things much if there are a low-100s number of blocks. Getting rid of the added `list()` altogether (which I only intended as a diagnostic) works for any number of blocks. I see you deleted your comment, I'll leave this. – agf Aug 13 '11 at 08:47
  • @agf: I added some code which I think demonstrates that the generator is fully consumed before the worker processes finish (almost) any of the tasks. Regarding the chunk size of 100 that I used in my code -- that was just an example. The OP can change it to whatever makes sense for his problem. He hasn't told us how many START/STOP blocks there are, but from his example it looked like they are very short blocks. Since reading the whole file causes a memory problem, I inferred that there must be very many short blocks. – unutbu Aug 13 '11 at 09:15
  • 1
    I agree it does consume the iterator -- I saw it too when I looked at the source, as I said in my comment. However, he never said reading the whole file caused a memory problem, he said "But it is very slow. As if p.map want to read the whole file in memory first." which was clearly just a guess. His example was clearly contrived so I don't think we can infer anything about the relative block size vs. count. I still don't think your solution addresses his problem, and it still requires chunks of the file longer than a single line be held in memory. – agf Aug 13 '11 at 09:25
  • thanks @unutbu,I get what the problem is. My block_generator(file) in original form gives the right answer only when you get block1 and processing block1 and get blockk2, and so on. Because mp.Pool.mp() try to consume all the blocks first. When I tried to process Block1, it failed cause file pointer is at the end of the file. – gstar2002 Aug 13 '11 at 13:14
  • If I list(takewhile) in my block_generator(file), it gives the right behave. But because mp.Pool.mp() tried to get all the blocks in memory first and the file is large. It was then slow. Grouper() should slove the problem. – gstar2002 Aug 13 '11 at 13:22
  • My question is then, why should mp.Pool.mp() behave this way. It could first get n blocks for the n works, and when one worker finish the processing then get another block for it. This way is clearly better for large tasks and those tasks are why people want to use mp.Pool.mp() at first place. – gstar2002 Aug 13 '11 at 13:27
  • @gstar2002: I've stared at pool.py for a while trying to figure out how `_handle_results` might be modified to put more tasks in the task queue whenever a task finishes. I think it would take some fairly significant rewriting of code to make that work. `mp.Pool` works fine for a lot of problems. When memory becomes an issue, my crude workaround should work fairly well for most of those cases. – unutbu Aug 13 '11 at 13:56
  • @gstar2002: Using my workaround, in the worst-case scenario there is one process which holds up `p.map`, stalling all the other processes. In that case I think you have to abandon mp.Pool, instantiate workers with mp.Process, and write some custom code handling the interaction of the input and output queues. – unutbu Aug 13 '11 at 13:56
  • @gstar2002: Or maybe look at other [parallel processing frameworks](http://wiki.python.org/moin/ParallelProcessing). – unutbu Aug 13 '11 at 14:05
1

Basically, when you iterate over a file like you are, each time you read a new line from the file you move the file pointer ahead one line.

So, when you do

block=itertools.takewhile(lambda x:x!='STOP',lines) 

every time the iterator returned by takewhile gets a new item from lines, it moves the file pointer.

It's generally bad to advance an iterator you're already looping over in the for loop. However, the for loop is suspended temporarily on every yield, and map exhausts the takewhile before continuing the for loop, so you get the desired behavior.

When you have the for loop and the takewhiles running at the same time, the file pointer rapidly gets moved to the end, and you get an error.

Try this instead, it should be faster than wrapping the takewhile in a list:

from contextlib import closing
from itertools import repeat

def block_generator(filename):
    with open(filename) as infile:
        for pos in (infile.tell() for line in infile if line == 'START'):
            yield pos

def my_f_wrapper(pos, filename):
    with open(filename) as infile:
        infile.seek(pos)
        block=itertools.takewhile(lambda x:x!='STOP', infile)
        my_f(block)

blocks = block_generator(filename)
p.imap(my_f_wrapper, blocks, repeat(filename))

Basically, you want each my_f to be operating independently on the file, so you need to open the file independently for each one.

I can't think of a way that doesn't require the file to be iterated over twice, once by the for loop and once by the takewhiles all put together, while still processing the file in parallel. In your original version, the takewhiles advanced the file pointer for the for loop, so it was very efficient.

If you weren't iterating over lines, but just bytes, I'd recommend using mmap for this, but it would make things a lot more complicated if you're working with lines of text.

Edit: An alternative would be to have block_generator go through the file and find all the positions of START and STOP, then feed them in pairs to the wrapper. That way, the wrapper wouldn't have to compare the lines to STOP, it would just have to use tell() on the file to make sure it wasn't at STOP. I'm not sure whether or not this would be faster.

agf
  • 171,228
  • 44
  • 289
  • 238
  • 1
    `p.imap(my_f_wrapper,blocks)` fully consumes `blocks` immediately. This claim is corroborated by the demonstration code in the second half of my answer. If there are a great many START/STOP blocks, then this means there will be a great many open file objects waiting in the task queue. I don't think that is a good idea, because the OS puts a limit on the number of file descriptors a process can have open at the same time. – unutbu Aug 13 '11 at 09:19
  • I actually though of this (based on your information that the iterable was consumed) while responding to your post and have already moved file opening into the wrapper. Good note though. – agf Aug 13 '11 at 09:30
  • `multiprocessing` can be effective when a problem is CPU bound. The more IO, the less effective `multiprocessing` becomes. `my_f_wrapper` increases the amount of IO each worker process has to perform. Not only is the file opened once to find all the seek positions, the file is being opened, seeked, read and closed once more for every START/STOP block. – unutbu Aug 13 '11 at 11:09
  • hi @afg, you are right about the file pointer thing. That is also why the simple map() function works. – gstar2002 Aug 13 '11 at 13:04
  • @adf sorry, I tried to upvote your answer. But I need more Rep to do that. As soon as I have enough Rep I will do that. – gstar2002 Aug 13 '11 at 15:24