0

I have several compressed files with sizes on the order of 2GB compressed. The beginning of each file has a set of headers which I parse and extract a list of ~4,000,000 pointers (pointers).

For each pair of pointers (pointers[i], pointers[i+1]) for 0 <= i < len(pointers), I

  • seek to pointers[i]
  • read pointers[i+1]-pointer[i]
  • decompress it
  • do a single pass operation on that data and update a dictionary with what I find.

The issue is, I can only process roughly 30 of pointer pairs a second using a single Python process, which means each file takes more than a day to get through.

Assuming splitting up the pointers list among multiple processes doesn't hurt performance (due to each process looking at the same file, though different non-overlapping parts), how can I use multiprocessing to speed this up?


My single threaded operation looks like this:

def search_clusters(pointers, filepath, automaton, counter):
    def _decompress_lzma(f, pointer, chunk_size=2**14):
        # skipping over this
        ...
        return uncompressed_buffer

    first_pointer, last_pointer = pointers[0], pointers[-1]
    with open(filepath, 'rb') as fh:
        fh.seek(first_pointer)
        f = StringIO(fh.read(last_pointer - first_pointer))

    for pointer1, pointer2 in zip(pointers, pointers[1:]):
        size = pointer2 - pointer1
        f.seek(pointer1 - first_pointer)
        buffer = _decompress_lzma(f, 0)

        # skipping details, ultimately the counter dict is
        # modified passing the uncompressed buffer through
        # an aho corasick automaton
        counter = update_counter_with_buffer(buffer, automaton, counter)

    return counter


# parse file and return pointers list
bzf = ZimFile(infile)
pointers = bzf.cluster_pointers

counter = load_counter_dict() # returns collections.Counter()
automaton = load_automaton()

search_clusters(pointers, infile, autmaton, counter)

I tried changing this to use multiprocessing.Pool:

from itertools import repeat, izip
import logging
import multiprocessing

logger = multiprocessing.log_to_stderr()
logger.setLevel(multiprocessing.SUBDEBUG)

def chunked(pointers, chunksize=1024):
    for i in range(0, len(pointers), chunksize):
        yield list(pointers[i:i+chunksize+1])

def search_wrapper(args):
    return search_clusters(*args)

# parse file and return pointers list
bzf = ZimFile(infile)
pointers = bzf.cluster_pointers

counter = load_counter_dict() # returns collections.Counter()

map_args = izip(chunked(cluster_pointers), repeat(infile),
                repeat(automaton.copy()), repeat(word_counter.copy()))

pool = multiprocessing.Pool(20)

results = pool.map(search_wrapper, map_args)
pool.close()
pool.join()

but after a little while of processing, I get the following message and the script just hangs there with no further output:

[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-20] child process calling self.run()

However, if I run with a serialized version of map without multiprocessing, things run just fine:

map(search_wrapper, map_args)

Any advice on how to change my multiprocessing code so it doesn't hang? Is it even a good idea to attempt to use multiple processes to read the same file?

wflynny
  • 18,065
  • 5
  • 46
  • 67
  • 2
    if all files are in the same media, you may not benefit that much of paralleled I/O. In fact, depending on the device type it will hurt performance. – Paulo Scardine Oct 28 '16 at 00:19
  • ***Before*** you do anything else you should profile your code and see where's it's spending most of its time to make sure you working on speeding the most time-consuming part(s) up. See [_How can you profile a Python script?_](http://stackoverflow.com/questions/582336/how-can-you-profile-a-python-script) – martineau Oct 28 '16 at 00:31
  • @martineau I already profiled it, getting it down to from 0.33 parses/second to 30 parses/second. It's a very heavy computation. The runtime > 24 hours is only a problem because I have ~100 of these files to parse and my local cluster max walltime limit of 24 hours. I guess I could just checkpoint and parse them over 2 days, but before I implement that I was exploring multiprocessing. – wflynny Oct 28 '16 at 00:37
  • @PauloScardine Would loading the entire file into RAM help? – wflynny Oct 28 '16 at 00:39
  • If loading the entire file into memory is feasible, if might help if what you're trying to do is input-bound (as determined by profiling it) as opposed to being compute-bound. – martineau Oct 28 '16 at 00:44
  • Its unlikely this is disk bound, but if you are running linux, you could `dd` this to /dev/null using DIRECTIO to get raw read rate. – tdelaney Oct 28 '16 at 00:45
  • The real bottleneck, which is unavoidable, is I'm running the buffer through an [Aho Corasick finite state machine](https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm). The majority of the speedup I initial got through profiling was due to implementing an efficient automaton. – wflynny Oct 28 '16 at 00:49
  • What's happening with system memory? It looks kinda like a memory allocation problem. You could implement a dummy `update_counter_with_buffer` that returns a smalled canned something to check everything else. I'm not sure what's in `repeat(automaton.copy()), repeat(word_counter.copy())` but why ship them to the subprocess? Its not working right anyway as you just keep sending the original Counter down, loosing data. `return counter ` ... how big is that thing? For debug you could log `len(pickle.dumps(counter, 2))`. – tdelaney Oct 28 '16 at 01:09
  • Generally, though, this seems like a good candidate for multiprocessing though. I think you are on the right path. – tdelaney Oct 28 '16 at 01:11

0 Answers0