6

I am currently trying to read a large file (80 million lines), where I need to make a computationally intensive matrix multiplication for each entry. After calculating this, I want to insert the result into a database. Because of the time intensive manner of this process, I want to split the file onto multiple cores to speed up the process.

After researching I found this promising attempt, which split a file into n parts.

def file_block(fp, number_of_blocks, block):
    '''
    A generator that splits a file into blocks and iterates
    over the lines of one of the blocks.

    '''

    assert 0 <= block and block < number_of_blocks
    assert 0 < number_of_blocks

    fp.seek(0,2)
    file_size = fp.tell()

    ini = file_size * block / number_of_blocks
    end = file_size * (1 + block) / number_of_blocks

    if ini <= 0:
        fp.seek(0)
    else:
        fp.seek(ini-1)
        fp.readline()

    while fp.tell() < end:
        yield fp.readline()

Iteratively, you can call the function like this:

if __name__ == '__main__':
    fp = open(filename)
    number_of_chunks = 4
    for chunk_number in range(number_of_chunks):
        print chunk_number, 100 * '='
        for line in file_block(fp, number_of_chunks, chunk_number):
            process(line)

While this works, I run into problems, parallelizing this using multiprocessing:

fp = open(filename)
number_of_chunks = 4
li = [file_block(fp, number_of_chunks, chunk_number) for chunk_number in range(number_of_chunks)]

p = Pool(cpu_count() - 1)
p.map(processChunk,li)

With the error being, that generators cannot be pickled.

While I understand this error, it is too expensive to first iterate over the whole file to put all lines into a list.

Moreover, I want to use blocks of lines per core per iteration, because it is more efficient to insert multiple lines into the database at once (instead of 1 by 1 if using the typical map approach)

Thanks for your help.

j_4321
  • 15,431
  • 3
  • 34
  • 61
bublitz
  • 888
  • 2
  • 11
  • 21
  • 3
    You could do an initial pass of the large file to make a note of seek co-ordinates and the number of lines to read from that position. You can then call your multiprocessing with these two numbers and keep the generator tucked away in each process – kezzos Nov 22 '16 at 15:33
  • 1
    Is it possible to split the file into four files first? – cwallenpoole Nov 22 '16 at 15:42
  • Move the file opening and `file_block` code into each thread instead of trying to initialize it before the thread starts. It won't be a problem having the file open 4 times instead of just once, as long as it's read only. – Mark Ransom Nov 22 '16 at 16:00
  • You might be better off using a package that can handle the reading of large files that are out of memory such as Blaze – rgalbo Nov 22 '16 at 16:52

1 Answers1

4

Instead of creating generators up front and passing them into each thread, leave that to the thread code.

def processChunk(params):
    filename, chunk_number, number_of_chunks = params
    with open(filename, 'r') as fp:
        for line in file_block(fp, number_of_chunks, chunk_number):
            process(line)

li = [(filename, i, number_of_chunks) for i in range(number_of_chunks)]
p.map(processChunk, li)
Mark Ransom
  • 299,747
  • 42
  • 398
  • 622