2

I have hundreds of gigabytes of data in binary files. I want to take a random sample of the data, reading several consecutive records at random many times.

The data is stored in many files. The main files do not store the data in any particular order, so each one has a sorted index file. My current code is something like this, except that there are many files:

index = open("foo.index", 'rb')
data = open("foo", 'rb')
index_offset_format = 'Q'
index_offset_size = struct.calcsize(index_offset_format)
record_set = []
for _ in range(n_batches):
    # Read `batch_size` offsets from the index - these are consecutive,
    # so they can be read in one operation
    index_offset_start = random.randint(0, N_RECORDS - batch_size)
    index.seek(index_offset_start)
    data_offsets = struct.iter_unpack(
        index_offset_format,
        index.read(index_offset_size * batch_size))

    # Read actual records from data file. These are not consecutive
    records = []
    for offset in data_offsets:
        data.seek(offset)
        records.append(data.read(RECORD_SIZE))
    record_set.append(records)

Then other things are done with the records. From profiling, I see that program is heavily IO-bound, and most of the time is spent in index.read and data.read. I suspect this is because read is blocking: the interpreter waits for the OS to read the data from disk before asking for the next random chunk of data, so the OS has no opportunity to optimise the disk access pattern. So: is there some file API that I can pass a batch of instructions to? Something like:

def read_many(file, offsets, lengths):
    '''
    @param file: the file to read from
    @param offsets: the offsets to seek to
    @param lengths: the lengths of data to read
    @return an iterable over the file contents at the requested offsets
    '''

Alternatively, would it be enough to open several file objects and request multiple reads using multithreading? Or would the GIL prevent that from being useful?

Apalala
  • 9,017
  • 3
  • 30
  • 48
z0r
  • 8,185
  • 4
  • 64
  • 83

1 Answers1

3

Because the processes are IO-bound, the bound for the reads is set by the operating system's disk operation scheduler, and the disk's cache.

Actual, per-core parallelization can be had easily with multiprocessing.Pool.imap_unordered():

def pmap(fun, tasks):
    from multiprocessing import Pool
    with Pool() as pool:
        yield from pool.imap_unordered(fun, tasks)

for record_set in pmap(process_one_file, filenames):
   print(record_set)

Having several files open at the same time, and probably a read() being executed by each core, should allow the disk scheduler to figure out a schedule that is better than the serial one imposed by the list of file names.

The beauty of imap_unordered() is that it decouples post-processing from which, how, and why a task finished earlier than the other (the order may be different on different runs).

As mentioned in the comments, the GIL is involved only during execution of Python code, which is not the case for a program blocking on I/O.

Apalala
  • 9,017
  • 3
  • 30
  • 48