4

I have very large binary files with x number of int16 data points for y sensors, along with headers with some basic info. The binary file is written as y values for each sample time up to x samples, then another set of readings and so on. If I want all of the data, I am using numpy.fromfile() which works really nice and fast. However, if I only want a subset of sensor data or only specific sensors, I currently have a hideous double for loop, using file.seek(), file.read(), and struct.unpack() that takes forever. Is there another way to do this faster in python? perhaps with mmap() which I do not understand well? or just using the whole fromfile() and then subsampling?

data = numpy.empty(num_pts, sensor_indices)
for i in range(num_pts):
    for j in range(sensor_indices):
        curr_file.seek(bin_offsets[j])
        data_binary = curr_file.read(2)
        data[j][i] = struct.unpack('h', data_binary)[0]

Having followed advice from @rrauenza on mmap, which was great info, I edited the code to be

mm = mmap.mmap(curr_file.fileno(), 0, access=mmap.ACCESS_READ)
data = numpy.empty(num_pts,sensor_indices)
for i in range(num_pts):
    for j in range(len(sensor_indices)):
        offset += bin_offsets[j] * 2
        data[j][i] = struct.unpack('h', mm[offset:offset+2])[0]

while this IS faster than before, it's still orders of magnitude slower than

shape = (x, y)
data = np.fromfile(file=self.curr_file, dtype=np.int16).reshape(shape)
data = data.transpose()
data = data[sensor_indices, :]
data = data[:, range(num_pts)]

I tested this with a smaller 30 Mb file that is only 16 sensors with 30 seconds of data. Original code was 160 s, mmap was 105 s, and np.fromfile and subsampling was 0.33 s.

Remaining question is - Clearly using numpy.fromfile() is better with the small file, but will there be issues with much larger files that may be up 20 Gb with hours or days of data and up to 500 sensors?

launchpadmcquack
  • 1,021
  • 1
  • 12
  • 19
  • 2
    Have you looked into pandas? It's great for sorting large data sets in virtually any way you want. – Chris Jun 03 '16 at 21:15
  • Hi, Welcome to StackOverflow! Your title is a little confusing -- *strip* usually means remove. A more accurate word might be *extract*. – rrauenza Jun 03 '16 at 21:40
  • How big is your very large file? – rrauenza Jun 05 '16 at 00:13
  • Files can be up to 20 Gb. After your very helpful advice @rrauenza, I've tried three different code chunks and edited the OP with new info. – launchpadmcquack Jun 05 '16 at 23:45
  • 1
    I think it's really going to depend on the amount of RAM you have. It's a matter of [scalability](http://stackoverflow.com/questions/9420014/what-does-it-mean-scalability). `mmap()` should scale linearly as your data grows, `numpy.fromfile()` will not as your data size grows, since at some point you will need to page. – rrauenza Jun 06 '16 at 00:41
  • 1
    You can use `numpy.memmap` and have the best of both worlds. Do slicing (`0:num_pts`) before fancy indexing to minimize copies. –  Jun 09 '16 at 08:06

1 Answers1

6

I would definitely try mmap():

https://docs.python.org/2/library/mmap.html

You're reading a lot of small bits which has a lot of system call overhead if you're calling seek() and read() for every int16 you are extracting.

I've written a small test to demonstrate:

#!/usr/bin/python

import mmap
import os
import struct
import sys

FILE = "/opt/tmp/random"  # dd if=/dev/random of=/tmp/random bs=1024k count=1024
SIZE = os.stat(FILE).st_size
BYTES = 2
SKIP = 10


def byfile():
    sum = 0
    with open(FILE, "r") as fd:
        for offset in range(0, SIZE/BYTES, SKIP*BYTES):
            fd.seek(offset)
            data = fd.read(BYTES)
            sum += struct.unpack('h', data)[0]
    return sum


def bymmap():
    sum = 0
    with open(FILE, "r") as fd:
        mm = mmap.mmap(fd.fileno(), 0, prot=mmap.PROT_READ)
        for offset in range(0, SIZE/BYTES, SKIP*BYTES):
            data = mm[offset:offset+BYTES]
            sum += struct.unpack('h', data)[0]
    return sum


if sys.argv[1] == 'mmap':
    print bymmap()

if sys.argv[1] == 'file':
    print byfile()

I ran each method twice to compensate for caching. I used time because I wanted to measure user and sys time.

Here are the results:

[centos7:/tmp]$ time ./test.py file
-211990391

real    0m44.656s
user    0m35.978s
sys     0m8.697s
[centos7:/tmp]$ time ./test.py file
-211990391

real    0m43.091s
user    0m37.571s
sys     0m5.539s
[centos7:/tmp]$ time ./test.py mmap
-211990391

real    0m16.712s
user    0m15.495s
sys     0m1.227s
[centos7:/tmp]$ time ./test.py mmap
-211990391

real    0m16.942s
user    0m15.846s
sys     0m1.104s
[centos7:/tmp]$ 

(The sum -211990391 just validates both versions do the same thing.)

Looking at each version's 2nd result, mmap() is ~1/3rd of the real time. User time is ~1/2 and system time is ~1/5th.

Your other options for perhaps speeding this up are:

(1) As you mentioned, load the whole file. The large I/O's instead of the small I/O's could speed things up. If you exceed system memory, though, you'll fall back to paging, which will be worse than mmap() (since you have to page out). I'm not super hopeful here because mmap is already using larger I/O's.

(2) Concurrency. Maybe reading the file in parallel through multiple threads could speed things up, but you'll have the Python GIL to deal with. Multiprocessing will work better by avoiding the GIL, and you could easily pass your data back to a top level handler. This will, however, work against the next item, locality: You might make your I/O more random.

(3) Locality. Somehow organize your data (or order your reads) so that your data is closer together. mmap() pages the file in chunks according to the system pagesize:

>>> import mmap
>>> mmap.PAGESIZE
4096
>>> mmap.ALLOCATIONGRANULARITY
4096
>>> 

If your data is closer together (within the 4k chunk), it will already have been loaded into the buffer cache.

(4) Better hardware. Like an SSD.

I did run this on an SSD and it was much faster. I ran a profile of the python, wondering if the unpack was expensive. It's not:

$ python -m cProfile test.py mmap                                                                                                                        
121679286
         26843553 function calls in 8.369 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    6.204    6.204    8.357    8.357 test.py:24(bymmap)
        1    0.012    0.012    8.369    8.369 test.py:3(<module>)
 26843546    1.700    0.000    1.700    0.000 {_struct.unpack}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
        1    0.000    0.000    0.000    0.000 {method 'fileno' of 'file' objects}
        1    0.000    0.000    0.000    0.000 {open}
        1    0.000    0.000    0.000    0.000 {posix.stat}
        1    0.453    0.453    0.453    0.453 {range}

Addendum:

Curiosity got the best of me and I tried out multiprocessing. I need to look at my partitioning closer, but the number of unpacks (53687092) is the same across trials:

$ time ./test2.py 4
[(4415068.0, 13421773), (-145566705.0, 13421773), (14296671.0, 13421773), (109804332.0, 13421773)]
(-17050634.0, 53687092)

real    0m5.629s
user    0m17.756s
sys     0m0.066s
$ time ./test2.py 1
[(264140374.0, 53687092)]
(264140374.0, 53687092)

real    0m13.246s
user    0m13.175s
sys     0m0.060s

Code:

#!/usr/bin/python

import functools
import multiprocessing
import mmap
import os
import struct
import sys

FILE = "/tmp/random"  # dd if=/dev/random of=/tmp/random bs=1024k count=1024
SIZE = os.stat(FILE).st_size
BYTES = 2
SKIP = 10


def bymmap(poolsize, n):
    partition = SIZE/poolsize
    initial = n * partition
    end = initial + partition
    sum = 0.0
    unpacks = 0
    with open(FILE, "r") as fd:
        mm = mmap.mmap(fd.fileno(), 0, prot=mmap.PROT_READ)
        for offset in xrange(initial, end, SKIP*BYTES):
            data = mm[offset:offset+BYTES]
            sum += struct.unpack('h', data)[0]
            unpacks += 1
    return (sum, unpacks)


poolsize = int(sys.argv[1])
pool = multiprocessing.Pool(poolsize)
results = pool.map(functools.partial(bymmap, poolsize), range(0, poolsize))
print results
print reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]), results)
Community
  • 1
  • 1
rrauenza
  • 6,285
  • 4
  • 32
  • 57