87

I have a single big text file in which I want to process each line ( do some operations ) and store them in a database. Since a single simple program is taking too long, I want it to be done via multiple processes or threads. Each thread/process should read the DIFFERENT data(different lines) from that single file and do some operations on their piece of data(lines) and put them in the database so that in the end, I have whole of the data processed and my database is dumped with the data I need.

But I am not able to figure it out that how to approach this.

martineau
  • 119,623
  • 25
  • 170
  • 301
pranavk
  • 1,774
  • 3
  • 17
  • 25

3 Answers3

116

What you are looking for is a Producer/Consumer pattern

Basic threading example

Here is a basic example using the threading module (instead of multiprocessing)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

You wouldn't share the file object with the threads. You would produce work for them by supplying the queue with lines of data. Then each thread would pick up a line, process it, and then return it in the queue.

There are some more advanced facilities built into the multiprocessing module to share data, like lists and special kind of Queue. There are trade-offs to using multiprocessing vs threads and it depends on whether your work is cpu bound or IO bound.

Basic multiprocessing.Pool example

Here is a really basic example of a multiprocessing Pool

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

A Pool is a convenience object that manages its own processes. Since an open file can iterate over its lines, you can pass it to the pool.map(), which will loop over it and deliver lines to the worker function. Map blocks and returns the entire result when its done. Be aware that this is an overly simplified example, and that the pool.map() is going to read your entire file into memory all at once before dishing out work. If you expect to have large files, keep this in mind. There are more advanced ways to design a producer/consumer setup.

Manual "pool" with limit and line re-sorting

This is a manual example of the Pool.map, but instead of consuming an entire iterable in one go, you can set a queue size so that you are only feeding it piece by piece as fast as it can process. I also added the line numbers so that you can track them and refer to them if you want, later on.

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)
jdi
  • 90,542
  • 19
  • 167
  • 203
  • yep, the file is larger, about 1 GB or so. I don't know how much larger you mean by saying larger, 1 GB is larger for me though. – pranavk Jun 25 '12 at 20:35
  • Thats fine. Im sure you can take these examples and extrapolate for your needs. The threading one is fine as it is. The multiprocessing one just needs a similar queue for you to feed. – jdi Jun 25 '12 at 20:37
  • 1
    This is good, but what if the processing is I/O bound? In that case, parallelism may slow things down rather than speeding it up. Seeks within a single disk track are much faster than intertrack seeks, and doing I/O in parallel tends to introduce intertrack seeks in what would otherwise be a sequential I/O load. To get some benefit from parallel I/O, sometimes it helps quite a bit to use a RAID mirror. – user1277476 Jun 25 '12 at 20:47
  • @user1277476: My examples don't really deal with the speed of *reading* the file. Its reading the exact same way. My examples address the work of processing each line. When I say cpu or io bound, I am referring to what is happening inside the work. The file is not being read any faster. The concept here is that instead of processing each line as you read from the file, you feed the line into a queue and let multiple workers handle it. – jdi Jun 25 '12 at 20:54
  • @pranavk: The file size just means you will need enough memory to read it all in. Otherwise you just need to feed it line by line to a queue instead of using the `map` – jdi Jun 25 '12 at 23:26
  • @pranavk: I just added a 3rd example showing how to not pre-consume the entire file, and deliver as fast as the workers can consume. – jdi Jun 26 '12 at 00:06
  • @jdi Wow nice example you made. That may be one of the better ones available on the web now :) – KobeJohn Jun 26 '12 at 00:19
  • @kobejohn: Thanks! Which one? That 3rd one? I just updated it once more to sort the lines back again – jdi Jun 26 '12 at 01:06
  • @jdi all of them together. Good introduction by example. Maybe links to the docs would be nice icing on the cake. – KobeJohn Jun 26 '12 at 01:25
  • Agreed. This answer is much better than mine. – mgilson Jun 26 '12 at 11:58
  • @jdi In "Manual "pool" with limit and line re-sorting", what if you have a string of inputs from the iterator that completes really really fast. It looks like (and from testing it, it seems to happen) that the workers return and thus the `Process` object completes and closes _before the new input has been added to the queue_. In this case the program will hang because there are no more workers left! – Hooked Jun 27 '12 at 14:42
  • @Hooked: I am not so sure that is right, and I am not sure how you were testing it. But, it should not matter how little of input you have. There are always sentinel values (None in this case) added to the end of the data to signal the workers to exit. They should only exit when they receive None values. Otherwise they sit and wait for data from the queue. Can you please link me to a pastebin illustrating the problem? I switched my example to use `f = ['a','b','c','d','e']`, and also commented out the sleep in the worker. No issues. – jdi Jun 27 '12 at 16:09
  • awesome examples. I'm wondering if you can help me a little bit with ` iters = itertools.chain(f, (None,)*num_workers)` I don't quite understand what this line is doing. – jwillis0720 Sep 11 '14 at 04:03
  • 3
    @jwillis0720 - Sure. `(None,) * num_workers` creates a tuple of None values equal to the size of the number of workers. These are going to be the sentinel values that tell each thread to quit because there is no more work. The `itertools.chain` function let's you put multiple sequences together into one virtual sequence without having to copy anything. So what we get is that first it loops over the lines in the file, and then the None values. – jdi Sep 11 '14 at 08:09
  • 2
    That's better explained than my professor, very nice +1. – lycuid Nov 29 '16 at 04:48
  • nice! but I do not understand what you say with `Be aware that in overly simple example, the map is going to consume your file all at once before dishing out work`. – ℕʘʘḆḽḘ May 01 '18 at 22:25
  • 1
    @ℕʘʘḆḽḘ, I have edited my text a bit to be more clear. It now explains that the middle example is going to slurp your entire file data into memory at once, which could be a problem if you file is larger than the amount of ram you currently have available. Then I show in the 3rd example how to go line by line, so as not to consume the entire file at once. – jdi May 02 '18 at 03:47
  • thanks a lot, very helpful. Just one more thing. I really do not understand why the syntax `pool.map(process_line, source_file, 4)` makes `pool` operate on batches of 4 rows. I mean, what is happening here? – ℕʘʘḆḽḘ May 02 '18 at 11:41
  • normally iterating using `with open` only iterates row by row. Am I missing something here? Thanks again! – ℕʘʘḆḽḘ May 02 '18 at 12:57
  • 1
    @ℕʘʘḆḽḘ read the docs for pool.Map(). It says it will split the iterable up into chunks and submit them to the workers. So it will end up consuming all the lines into memory. Yes iterating one line at a time is memory efficient, but if you end up keeping all those lines in memory then you are back to reading the whole file. – jdi May 02 '18 at 20:18
  • The Manual Implementation when run for large text files leads to EOF error. Any solution for it ? – R_Moose Sep 29 '20 at 20:28
  • @R_Moose hard to be sure without a reproduction example. I am guessing the EOF happens when the main loop is iterating the lines in the file and it gets to the end? Are you using a file that is actively been written to? How big does the file need to be? Can you just catch the EOF exception and consider it done so the code can move on to the loop that joins and waits for the workers? – jdi Sep 30 '20 at 21:20
  • @jdi I am using a file that is 7 GB. I am not writing to the file, I am just reading from it, changing line and then saving output in the list. Exactly like what is being done here. Refer to this: https://stackoverflow.com/questions/25994201/python-multiprocessing-queue-error as to why EOF is happening. You are right, I couldnt reproduce it for smaller files or even for larger files it doesn't happen all the time, it is pretty sporadic. I would say 4/10 runs for a file of & GBs – R_Moose Oct 01 '20 at 16:18
  • @R_Moose well it is not exactly the same problem as that link because in my implementation you can see that I am sending a poison pill to every worker (a `None` value to each worker after the file EOF), and we join on every worker before exiting. So you should see every worker properly shut down. – jdi Oct 01 '20 at 18:21
  • @jdi very well. This is the code I am using, exactly like your implementation. I am using a dictionary instead of a list to avoid sorting. https://stackoverflow.com/questions/64128330/eof-error-in-multiprocessing-while-using-dictionary-type Any idea why it would fail on EOF ? No concerns as your code works well for file size < 5 Gigs on my machine. – R_Moose Oct 01 '20 at 22:48
  • I have the exact error on my post here now: https://stackoverflow.com/questions/64128330/eof-error-in-multiprocessing-while-using-dictionary-type – R_Moose Oct 01 '20 at 23:11
8

Here's a really stupid example that I cooked up:

import os.path
import multiprocessing

def newlinebefore(f,n):
    f.seek(n)
    c=f.read(1)
    while c!='\n' and n > 0:
        n-=1
        f.seek(n)
        c=f.read(1)

    f.seek(n)
    return n

filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)

#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)

#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.


with open(filename,'r') as f:
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))

end_byte=[i-1 for i in start_byte] [1:] + [None]

def process_piece(filename,start,end):
    with open(filename,'r') as f:
        f.seek(start+1)
        if(end is None):
            text=f.read()
        else: 
            nbytes=end-start+1
            text=f.read(nbytes)

    # process text here. createing some object to be returned
    # You could wrap text into a StringIO object if you want to be able to
    # read from it the way you would a file.

    returnobj=text
    return returnobj

def wrapper(args):
    return process_piece(*args)

filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)

pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)

#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...

The tricky part here is to make sure that we split the file on newline characters so that you don't miss any lines (or only read partial lines). Then, each process reads it's part of the file and returns an object which can be put into the database by the main thread. Of course, you may even need to do this part in chunks so that you don't have to keep all of the information in memory at once. (this is quite easily accomplished -- just split the "args" list into X chunks and call pool.map(wrapper,chunk) -- See here)

Community
  • 1
  • 1
mgilson
  • 300,191
  • 65
  • 633
  • 696
-1

well break the single big file into multiple smaller files and have each of them processed in separate threads.

Tanu
  • 237
  • 2
  • 7