9

I have a binary file with a known format/structure.

How do I read all the binary data in to an array of the structure?

Something like (in pseudo code)

bytes = read_file(filename)
struct = {'int','int','float','byte[255]'}
data = read_as_struct(bytes, struct)
data[1]
>>> 10,11,10.1,Arr[255]

My solution so far is:

data = []

fmt   = '=iiiii256i'
fmt_s = '=iiiii'
fmt_spec = '256i'

struct_size = struct.calcsize(fmt)

for i in range(struct_size, len(bytes)-struct_size, struct_size):
    dat1= list(struct.unpack(fmt_s, bytes[i-struct_size:i-1024]))
    dat2= list(struct.unpack(fmt_spec, bytes[i-1024:i]))
    dat1.append(dat2)
    data.append(dat1)
vvvvv
  • 25,404
  • 19
  • 49
  • 81
kasperhj
  • 10,052
  • 21
  • 63
  • 106

4 Answers4

29

Actually it looks like you're trying to read a list (or array) of structures from the file. The idiomatic way to do this in Python is use the struct module and call struct.unpack() in a loop—either a fixed number of times if you know the number of them in advance, or until end-of-file is reached—and store the results in a list. Here's an example of the latter:

import struct

struct_fmt = '=5if255s' # int[5], float, byte[255]
struct_len = struct.calcsize(struct_fmt)
struct_unpack = struct.Struct(struct_fmt).unpack_from

results = []
with open(filename, "rb") as f:
    while True:
        data = f.read(struct_len)
        if not data: break
        s = struct_unpack(data)
        results.append(s)

The same results can be also obtained slightly more concisely using a list comprehension along with a short generator function helper (i.e. read_chunks() below):

def read_chunks(f, length):
    while True:
        data = f.read(length)
        if not data: break
        yield data

with open(filename, "rb") as f:
    results = [struct_unpack(chunk) for chunk in read_chunks(f, struct_len)]

Update

You don't, in fact, need to explicitly define a helper function as shown above because you can use Python's built-in iter() function to dynamically create the needed iterator object in the list comprehension itself like so:

from functools import partial

with open(filename, "rb") as f:
    results = [struct_unpack(chunk) for chunk in iter(partial(f.read, struct_len), b'')]
martineau
  • 119,623
  • 25
  • 170
  • 301
  • Some readers may also be interested in reading the answers to the question [**_Fastest way to read a binary file with a defined format?_**](https://stackoverflow.com/questions/44933639/fastest-way-to-read-a-binary-file-with-a-defined-format) – martineau Jul 10 '17 at 18:47
  • I get the following error when I try to use your suggestion: struct.error: unpack_from requires a buffer of at least 209 bytes What am I doing wrong? Sorry I am new to python. – xMutzelx Jan 08 '18 at 16:04
  • 1
    @xMutzelx: That can happen when your binary file's length isn't an exact multiple of the struct's size because there's no check being made after the `f.read()` call to ensure that the requested number of bytes is being returned. This can be because the file has some sort of header or trailer, in it as well as the data that comprises the array of structures (or it's corrupt). – martineau Jan 08 '18 at 18:29
  • 1
    @xMutzelx: I just test the code in the answer and got exactly the error you describe when I added a small (10 byte) header to the binary input file I used for testing. – martineau Jan 08 '18 at 18:59
  • I was able to fix the problem, thank you for your help. My struct contains this "Q50IB. This 1 Byte ("B") got padded to 4 Bytes. I just had to change the "B" to "I". – xMutzelx Jan 12 '18 at 08:08
  • 1
    @xMutzelx: Hmm...yes, have the wrong structure layout could also cause the problem. That's why it might be a good idea to at least manually verify that the computed `struct_len` value is what it ought to be. Also note that changing the structure format string prefix from `'='` to `'@'` (and leaving the `'B'` at the end in) might also have worked because it enables "native" alignment instead of suppressing it. – martineau Jan 12 '18 at 21:18
15

Use the struct module; you need to define the types in a string format documented with that library:

struct.unpack('=HHf255s', bytes)

The above example expects native byte-order, two unsigned shorts, a float and a string of 255 characters.

To loop over an already fully read bytes string, I'd use itertools; there is a handy grouper recipe that I've adapted here:

from itertools import izip_longest, imap
from struct import unpack, calcsize

fmt_s = '=5i'
fmt_spec = '=256i'
size_s = calcsize(fmt_s)
size = size_s + calcsize(fmt_spec)

def chunked(iterable, n, fillvalue=''):
    args = [iter(iterable)] * n
    return imap(''.join, izip_longest(*args, fillvalue=fillvalue))

data = [unpack(fmt_s, section[:size_s]) + (unpack(fmt_spec, section[size_s:]),)
    for section in chunked(bytes, size)]
    

This produces tuples rather than lists, but it's easy enough to adjust if you have to:

data = [list(unpack(fmt_s, section[:size_s])) + [list(unpack(fmt_spec, section[size_s:]))]
    for section in chunked(bytes, size)]
NoDataDumpNoContribution
  • 10,591
  • 9
  • 64
  • 104
Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
  • It seems this does not work on data larger than the struct size. My binary data repeats itself. – kasperhj Jan 08 '13 at 13:47
  • 1
    @lejon: it doesn't remove read data from `bytes` magically. Either slice `buffer` or use `unpack_from()` and an offset. – Martijn Pieters Jan 08 '13 at 14:02
  • so I will have to iterate through the `bytes` and assign the each `unpack` to a tuple? Also. Is there a way of putting the last 255 into an actual array, such that the output is in the form of what I put in my original post? – kasperhj Jan 08 '13 at 14:04
  • @lejon: What type is `Arr`? You'll have to pass the resulting string to that type manually, `struct` cannot do that for you. – Martijn Pieters Jan 08 '13 at 14:29
  • It's byte array. I've edited the answer to show my solution. However, is there a list comprehension to accomplish the same? – kasperhj Jan 08 '13 at 14:53
  • you might want to use [`struct.Struct`](https://docs.python.org/3/library/struct.html) which is like a compiled version of the format spec and offers both `size` and `[un]pack`. – WorldSEnder Nov 02 '18 at 17:55
  • @WorldSEnder: `struct.unpack()` and `struct.pack()` already create `Struct()` instances, caching up to 100 such objects (the 101st unique format string will clear the cache first, but most applications never get beyond a dozen or so struct formats, tops). The Python 2 version of the docs don't make this nearly clear enough, but you can [check the source code](https://github.com/python/cpython/blob/v2.7.15/Modules/_struct.c#L1814-L1843). – Martijn Pieters Nov 02 '18 at 18:16
2

Add comments

import struct 

First just read the binary into an array

mbr = file('mbrcontent', 'rb').read() 

So you can just fetch some piece of the the array

partition_table = mbr[446:510] 

and then unpack it as an integer

signature = struct.unpack('<H', mbr[510:512])[0] 

a more complex example

little_endian = (signature == 0xaa55) # should be True 
print "Little endian:", little_endian 
PART_FMT = (little_endian and '<' or '>') + ( 
"B" # status (0x80 = bootable (active), 0x00 = non-bootable) 
# CHS of first block 
"B" # Head 
"B" # Sector is in bits 5; bits 9 of cylinder are in bits 7-6 
"B" # bits 7-0 of cylinder 
"B" # partition type 
# CHS of last block 
"B" # Head 
"B" # Sector is in bits 5; bits 9 of cylinder are in bits 7-6 
"B" # bits 7-0 of cylinder 
"L" # LBA of first sector in the partition 
"L" # number of blocks in partition, in little-endian format 
) 

PART_SIZE = 16 
fmt_size = struct.calcsize(PART_FMT) 
# sanity check expectations 
assert fmt_size == PART_SIZE, "Partition format string is %i bytes, not %i" % (fmt_size, PART_SIZE) 

def cyl_sector(sector_cyl, cylinder7_0): 
    sector = sector_cyl & 0x1F # bits 5-0 

    # bits 7-6 of sector_cyl contain bits 9-8 of the cylinder 
    cyl_high = (sector_cyl >> 5) & 0x03 
    cyl = (cyl_high << 8) | cylinder7_0 
    return sector, cyl 

#I have corrected the indentation, but the change is refused because less than 6 characters, so I am adding this useful comment.
for partition in range(4): 
    print "Partition #%i" % partition, 
    offset = PART_SIZE * partition 
    (status, start_head, start_sector_cyl, start_cyl7_0, part_type, end_head, end_sector_cyl, end_cyl7_0, 
    lba, blocks ) = struct.unpack( PART_FMT,partition_table[offset:offset + PART_SIZE]) 
    if status == 0x80: 
        print "Bootable", 
    elif status: 
        print "Unknown status [%s]" % hex(status), 
        print "Type=0x%x" % part_type 
        start = (start_head,) + cyl_sector(start_sector_cyl, start_cyl7_0) 
        end = (end_head,) + cyl_sector(end_sector_cyl, end_cyl7_0) 
        print " (Start: Heads:%i\tCyl:%i\tSect:%i)" % start 
        print " (End: Heads:%i\tCyl:%i\tSect:%i)" % end 
        print " LBA:", lba 
        print " Blocks:", blocks 
Vincenzooo
  • 2,013
  • 1
  • 19
  • 33
ray_linn
  • 1,382
  • 10
  • 14
2
import os, re
import functools
import ctypes
from ctypes import string_at, byref, sizeof, cast, POINTER, pointer, create_string_buffer, memmove
import numpy as np
import pandas as pd

class _StructBase(ctypes.Structure):
    __type__ = 0
    _fields_ = []

    @classmethod
    def Offsetof(cls, field):
        pattern = '(?P<field>\w+)\[(?P<idx>\d+)\]'

        mat = re.match(pattern, field)
        if mat:
            fields = dict(cls.Fields())
            f = mat.groupdict()['field']
            idx = mat.groupdict()['idx']
            return cls.Offsetof(f) + int(idx) * ctypes.sizeof(fields[field])
        else:
            return getattr(cls, field).offset

    @classmethod
    def DType(cls):
        map = {
            ctypes.c_byte: np.byte,
            ctypes.c_ubyte: np.ubyte,
            ctypes.c_char: np.ubyte,

            ctypes.c_int8: np.int8,
            ctypes.c_int16: np.int16,
            ctypes.c_int32: np.int32,
            ctypes.c_int64: np.int64,

            ctypes.c_uint8: np.uint8,
            ctypes.c_uint16: np.uint16,
            ctypes.c_uint32: np.uint32,
            ctypes.c_uint64: np.uint64,

            ctypes.c_float: np.float32,
            ctypes.c_double: np.float64,
        }
        res = []

        for k, v in cls.Fields():
            if hasattr(v, '_length_'):
                if v._type_ != ctypes.c_char:
                    for i in range(v._length):
                        res.append((k, map[v], cls.Offsetof(k)))
                else:
                    res.append((k, 'S%d' % v._length_, cls.Offsetof(k)))
            else:
                res.append((k, map[v], cls.Offsetof(k)))
        res = pd.DataFrame(res, columns=['name', 'format', 'offset'])
        return np.dtype({
            'names': res['name'],
            'formats': res['format'],
            'offsets': res['offset'],
        })

    @classmethod
    def Attr(cls):
        fields = cls._fields_
        res = []
        for attr, tp in fields:
            if str(tp).find('_Array_') > 0 and str(tp).find('char_Array_') < 0:
                for i in range(tp._length_):
                    res.append((attr + '[%s]' % str(i), tp._type_))
            else:
                res.append((attr, tp))
        return res

    @classmethod
    def Fields(cls, notype=False):
        res = [cls.Attr()]
        cur_cls = cls
        while True:
            cur_cls = cur_cls.__bases__[0]
            if cur_cls == ctypes.Structure:
                break
            res.append(cur_cls.Attr())
        if notype:
            return [k for k, v in functools.reduce(list.__add__, reversed(res), [])]
        else:
            return functools.reduce(list.__add__, reversed(res), [])

    @classmethod
    def size(cls):
        return sizeof(cls)

    @classmethod
    def from_struct_binary(cls, path, max_count=2 ** 32, decode=True):
        print(os.path.getsize(path), cls.size())
        assert os.path.getsize(path) % cls.size() == 0
        size = os.path.getsize(path) // cls.size()
        size = min(size, max_count)

        index = range(size)
        array = np.fromfile(path, dtype=cls.DType(), count=size)

        df = pd.DataFrame(array, index=index)
        for attr, tp in eval(str(cls.DType())):
            if re.match('S\d+', tp) is not None and decode:
                try:
                    df[attr] = df[attr].map(lambda x: x.decode("utf-8"))
                except:
                    df[attr] = df[attr].map(lambda x: x.decode("gbk"))
        return df

class StructBase(_StructBase):
    _fields_ = [
        ('Type', ctypes.c_uint32),
    ]

class IndexStruct(StructBase):
    _fields_ = [
        ('Seq', ctypes.c_uint32),
        ('ExID', ctypes.c_char * 8),
        ('SecID', ctypes.c_char * 8),
        ('SecName', ctypes.c_char * 16),
        ('SourceID', ctypes.c_int32),
        ('Time', ctypes.c_uint32),
        ('PreClose', ctypes.c_uint32),
        ('Open', ctypes.c_uint32),
        ('High', ctypes.c_uint32),
        ('Low', ctypes.c_uint32),
        ('Match', ctypes.c_uint32),
    ]

df = IndexStruct.from_struct_binary('your path')
print(df)
xielongen
  • 21
  • 1
  • see also [How to pack and unpack using ctypes](https://stackoverflow.com/questions/1825715/how-to-pack-and-unpack-using-ctypes-structure-str) and [ctypes.BigEndianStructure](https://docs.python.org/3/library/ctypes.html#ctypes.BigEndianStructure) – milahu Mar 31 '23 at 17:21