4

From one of our client's requirement, I have to develop an application which should be able to process huge CSV files. File size could be in the range of 10 MB - 2GB in size.

Depending on size, module decides whether to read the file using Multiprocessing pool or by using normal CSV reader. But from observation, multi processing taking longer time than normal CSV reading when tested both the modes for a file with size of 100 MB.

Is this correct behaviour? OR Am I doing something wrong?

Here is my code:

def set_file_processing_mode(self, fpath):
   """ """
   fsize = self.get_file_size(fpath)
   if fsize > FILE_SIZE_200MB:
      self.read_in_async_mode = True
   else:
      self.read_in_async_mode = False

def read_line_by_line(self, filepath):
    """Reads CSV line by line"""
    with open(filepath, 'rb') as csvin:
        csvin = csv.reader(csvin, delimiter=',')
        for row in iter(csvin):
          yield row

def read_huge_file(self, filepath):
    """Read file in chunks"""
    pool = mp.Pool(1)
    for chunk_number in range(self.chunks): #self.chunks = 20
        proc = pool.apply_async(read_chunk_by_chunk, 
                        args=[filepath, self.chunks, chunk_number])
        reader = proc.get()
        yield reader
    pool.close()
    pool.join()

def iterate_chunks(self, filepath):
    """Read huge file rows"""
    for chunklist in self.read_huge_file(filepath):
        for row in chunklist:
            yield row
@timeit #-- custom decorator
def read_csv_rows(self, filepath):
    """Read CSV rows and pass it to processing"""
    if self.read_in_async_mode:
        print("Reading in async mode")
        for row in self.iterate_chunks(filepath):
            self.process(row)
    else:
        print("Reading in sync mode")
        for row in self.read_line_by_line(filepath):
            self.process(row)

def process(self, formatted_row):
    """Just prints the line"""
    self.log(formatted_row)

def read_chunk_by_chunk(filename, number_of_blocks, block):
  '''
  A generator that splits a file into blocks and iterates
  over the lines of one of the blocks.
  '''
  results = []
  assert 0 <= block and block < number_of_blocks
  assert 0 < number_of_blocks
  with open(filename) as fp :
    fp.seek(0,2)
    file_size = fp.tell()
    ini = file_size * block / number_of_blocks
    end = file_size * (1 + block) / number_of_blocks
    if ini <= 0:
        fp.seek(0)
    else:
        fp.seek(ini-1)
        fp.readline()
    while fp.tell() < end:
        results.append(fp.readline())
  return results

if __name__ == '__main__':
    classobj.read_csv_rows(sys.argv[1])    

Here is a test:

$ python csv_utils.py "input.csv"
Reading in async mode
FINISHED  IN 3.75 sec
$ python csv_utils.py "input.csv"
Reading in sync mode
FINISHED  IN 0.96 sec

Question is :

Why Async mode is taking longer?

NOTE: Removed unnecessary functions/lines to avoid complexity in the code

Laxmikant
  • 2,046
  • 3
  • 30
  • 44
  • 1
    Multi **processing** has the overhead of handling the processes and communication. You're also always calling [`AsyncResult.get()`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult.get) right away after applying your `read_chunk_by_chunk()`, which then has to wait for the chunk to finish reading, so you're effectively still serial, but with the additional overhead. The std csv reader on the other hand is passed a (possibly) buffered file, so IO is not actually that big of a problem probably. – Ilja Everilä Sep 16 '16 at 06:13
  • what processing do you need to do with the file? – miraculixx Sep 16 '16 at 06:22
  • Instead of having multiple producers (async readers), try multiple consumers, aka multiprocess the `self.process()`, if need be. – Ilja Everilä Sep 16 '16 at 06:24
  • @miraculixx: I have to compare the CSV row with db, if record exists update it else create a new entry in the table. – Laxmikant Sep 16 '16 at 06:25
  • In that case your best course of action will probably be to upload the CSV data to a temporary table in the DB directly and perform a single "upsert" query. How to upload the data depends on what DB you're using. – Ilja Everilä Sep 16 '16 at 06:28
  • @IljaEverilä: I could do that but I first I have to display it to users on the UI and he decides whether to update the record or not in the DB. – Laxmikant Sep 16 '16 at 06:29
  • Could you prefilter the data then? Or do you need to first check whether or not it exists at all? – Ilja Everilä Sep 16 '16 at 06:30
  • @IljaEverilä :- I did not understand ur first 2 comments, could you plz explain little bit? – Laxmikant Sep 16 '16 at 06:31
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/123498/discussion-between-ilja-everila-and-laxmikant). – Ilja Everilä Sep 16 '16 at 06:31

1 Answers1

8

Is this correct behaviour?

Yes - it may not be what you expect, but it is consistent with the way you implemented it and how multiprocessing works.

Why Async mode is taking longer?

The way your example works is perhaps best illustrated by a parable - bear with me please:

Let's say you ask your friend to engage in an experiment. You want him to go through a book and mark each page with a pen, as fast as he can. There are two rounds with a distinct setup, and you are going to time each round and then compare which one was faster:

  1. open the book on the first page, mark it, then flip the page and mark the following pages as they come up. Pure sequential processing.

  2. process the book in chunks. For this he should run through the book's pages chunk by chunk. That is he should first make a list of page numbers as starting points, say 1, 10, 20, 30, 40, etc. Then for each chunk, he should close the book, open it on the page for the starting point, process all pages before the next starting point comes up, close the book, then start all over again for the next chunk.

Which of these approaches will be faster?

Am I doing something wrong?

You decide both approaches take too long. What you really want to do is ask multiple people (processes) to do the marking in parallel. Now with a book (as with a file) that's difficult because, well, only one person (process) can access the book (file) at any one point. Still it can be done if the order of processing doesn't matter and it is the marking itself - not the accessing - that should run in parallel. So the new approach is like this:

  1. cut the pages out of the book and sort them into say 10 stacks
  2. ask ten people to mark one stack each

This approach will most certainly speed up the whole process. Perhaps surprisingly though the speed up will be less than a factor of 10 because step 1 takes some time, and only one person can do it. That's called Amdahl's law [wikipedia]:

$$ S_\text{latency}(s) = \frac{1}{(1 - p) + \frac{p}{s}}

Essentially what it means is that the (theoretical) speed-up of any process can only be as fast as the parallel processing part p is reduced in speed in relation to the part's sequential processing time (p/s).

Intuitively, the speed-up can only come from the part of the task that is processed in parallel, all the sequential parts are not affected and take the same amount of time, whether p is processed in parallel or not.

That said, in our example, obviously the speed-up can only come from step 2 (marking pages in parallel by multiple people), as step 1 (tearing up the book) is clearly sequential.

develop an application which should be able to process huge CSV files

Here's how to approach this:

  1. determine what part of the processing can be done in parallel, i.e. process each chunk sepearately and out of sequence
  2. read the file sequentially, splitting it up into chunks as you go
  3. use multiprocessing to run multiple processing steps in parallel

Something like this:

def process(rows):
    # do all the processing
    ...
    return result

if __name__ == '__main__':
    pool = mp.Pool(N) # N > 1
    chunks = get_chunks(...)
    for rows in chunks:
       result += pool.apply_async(process, rows)
    pool.close()
    pool.join() 

I'm not defining get_chunks here because there are several documented approaches to doing this e.g. here or here.

Conclusion

Depending on the kind of processing required for each file, it may well be that the sequential approach to processing any one file is the fastest possible approach, simply because the processing parts don't gain much from being done in parallel. You may still end up processing it chunk by chunk due to e.g. memory constraints. If that is the case, you probably don't need multiprocessing.

If you have multiple files that can be processed in parallel, multiprocessing is a very good approach. It works the same way as shown above, where the chunks are not rows but filenames.

Jonathan Hall
  • 75,165
  • 16
  • 143
  • 189
miraculixx
  • 10,034
  • 2
  • 41
  • 60
  • Perfect, Superb explained. So I should split the file and for each file need to be processed in multiprocessing pool. Thank you – Laxmikant Sep 16 '16 at 07:21