3

As part of my research, I am searching a good storing design for my panel data. I am using pandas for all in-memory operations. I've had a look at the following two questions/contributions, Large Data Work flows using Pandas and Query HDF5 Pandas as they come closest to my set-up. However, I have a couple of questions left. First, let me define my data and some requirements:

  1. Size: I have around 800 dates, 9000 IDs and up to 200 variables. Hence, flattening the panel (along dates and IDs) corresponds to 7.2mio rows and 200 columns. This might all fit in memory or not, let's assume it does not. Disk-space is not an issue.

  2. Variables are typically calculated once, but updates/changes probably happen from time to time. Once updates occur, old versions don't matter anymore.

  3. New variables are added from time to time, mostly one at a time only.

  4. New rows are not added.

  5. Querying takes place. For example, often I need to select only a certain date range like date>start_date & date<end_date. But some queries need to consider rank conditions on dates. For example, get all data (i.e. columns) where rank(var1)>500 & rank(var1)<1000, where rank is as of date.

The objective is to achieve fast reading/querying of data. Data writing is not so critical.

I thought of the following HDF5 design:

  1. Follow the groups_map approach (of 1) to store variables in different tables. Limit the number of columns for each group to 10 (to avoid large memory loads when updating single variables, see point 3).

  2. Each group represents one table, where I use the multi-index based on dates & ids for each table stored.

  3. Create an update function, to update variables. The functions loads the table with all (10) columns to memory as a df, deletes the table on the disk, replaces the updated variable in df and saves the table from memory back to disk.

  4. Create an add function, add var1 to a group with less than 10 columns, or create new group if required. Saving similar as in 3. load current group to memory, delete table on disk, add new column and save it back on disk.

  5. Calculate ranks as of date for relevant variables and add them to disk-storage as rank_var1, which should reduce the query as of to simply rank_var1 > 500 & rank_var1<1000.

I have the following questions:

  1. Updating HDFTable, I suppose I have to delete the entire table in order to update a single column?

  2. When to use 'data_columns', or should I simply assign True in HDFStore.append()?

  3. If I want to query based on condition of rank_var1 > 500 & rank_var1<1000, but I need columns from other groups. Can I enter the index received from the rank_var1 condition into the query to get other columns based on this index (the index is a multi-index with date and ID)? Or would I need to loop this index by date and then chunk the IDs similar as proposed in 2 and repeat the procedure for each group where I need. Alternatively, (a) I could add to each groups table rank columns, but it seems extremely inefficient in terms of disk-storage. Note, the number of variables where rank filtering is relevant is limited (say 5). Or (b) I could simply use the df_rank received from the rank_var1 query and use in-memory operations via df_rank.merge(df_tmp, left_index=True, right_index=True, how='left') and loop through groups (df_tmp) where I select the desired columns.

  4. Say I have some data in different frequencies. Having different group_maps (or different storages) for different freq is the way to go I suppose?

  5. Copies of the storage might be used on win/ux systems. I assume it is perfectly compatible, anything to consider here?

  6. I plan to use pd.HDFStore(str(self.path), mode='a', complevel=9, complib='blosc'). Any concerns regarding complevel or complib?

I've started to write up some code, once I have something to show I'll edit and add it if desired. Please, let me know if you need any more information.

EDIT I here a first version of my storage class, please adjust path at bottom accordingly. Sorry for the length of the code, comments welcome

import pandas as pd
import numpy as np
import string

class LargeDFStorage():

    # TODO add index features to ensure correct indexes
    # index_names = ('date', 'id')

    def __init__(self, h5_path, groups_map):
        """

        Parameters
        ----------
        h5_path: str
            hdf5 storage path
        groups_map: dict
            where keys are group_names and values are dict, with at least key
            'columns' where the value is list of column names.
            A special group_name is reserved for group_name/key "query", which
            can be used as queering and conditioning table when getting data,
            see :meth:`.get`.
        """

        self.path = str(h5_path)
        self.groups_map = groups_map
        self.column_map = self._get_column_map()
        # if desired make part of arguments
        self.complib = 'blosc'
        self.complevel = 9

    def _get_column_map(self):
        """ Calc the inverse of the groups_map/ensures uniqueness of cols

        Returns
        -------
        dict: with cols as keys and group_names as values
        """
        column_map = dict()
        for g, value in self.groups_map.items():
            if len(set(column_map.keys()) & set(value['columns'])) > 0:
                raise ValueError('Columns have to be unique')
            for col in value['columns']:
                column_map[col] = g

        return column_map

    @staticmethod
    def group_col_names(store, group_name):
        """ Returns all column names of specific group

        Parameters
        ----------
        store: pd.HDFStore
        group_name: str

        Returns
        -------
        list:
            of all column names in the group
        """
        if group_name not in store:
            return []

        # hack to get column names, straightforward way!?
        return store.select(group_name, start=0, stop=0).columns.tolist()

    @staticmethod
    def stored_cols(store):
        """ Collects all columns stored in HDF5 store

        Parameters
        ----------
        store: pd.HDFStore

        Returns
        -------
        list:
            a list of all columns currently in the store
        """
        stored_cols = list()
        for x in store.items():
            group_name = x[0][1:]
            stored_cols += LargeDFStorage.group_col_names(store, group_name)

        return stored_cols

    def _find_groups(self, columns):
        """ Searches all groups required for covering columns

        Parameters
        ----------
        columns: list
            list of valid columns

        Returns
        -------
        list:
            of unique groups
        """
        groups = list()
        for column in columns:
            groups.append(self.column_map[column])

        return list(set(groups))

    def add_columns(self, df):
        """ Adds columns to storage for the first time. If columns should
        be updated use(use :meth:`.update` instead)

        Parameters
        ----------
        df: pandas.DataFrame
            with new columns (not yet stored in any of the tables)

        Returns
        -------

        """
        store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel,
                            complib=self.complib)

        # check if any column has been stored already
        if df.columns.isin(self.stored_cols(store)).any():
            store.close()
            raise ValueError('Some cols are already in the store')

        # find all groups needed to store the data
        groups = self._find_groups(df.columns)

        for group in groups:
            v = self.groups_map[group]

            # select columns of current group in df
            select_cols = df.columns[df.columns.isin(v['columns'])].tolist()
            tmp = df.reindex(columns=select_cols, copy=False)

            # set data column to False only in case of query data
            dc = None
            if group=='query':
                dc = True

            stored_cols = self.group_col_names(store,group)
            # no columns in group (group does not exists yet)
            if len(stored_cols)==0:
                store.append(group, tmp, data_columns=dc)
            else:
                # load current disk data to memory
                df_grp = store.get(group)
                # remove data from disk
                store.remove(group)
                # add new column(s) to df_disk
                df_grp = df_grp.merge(tmp, left_index=True, right_index=True,
                                      how='left')
                # save old data with new, additional columns
                store.append(group, df_grp, data_columns=dc)

        store.close()

    def _query_table(self, store, columns, where):
        """ Selects data from table 'query' and uses where expression

        Parameters
        ----------
        store: pd.HDFStore
        columns: list
            desired data columns
        where: str
            a valid select expression

        Returns
        -------

        """

        query_cols = self.group_col_names(store, 'query')
        if len(query_cols) == 0:
            store.close()
            raise ValueError('No data to query table')
        get_cols = list(set(query_cols) & set(columns))
        if len(get_cols) == 0:
            # load only one column to minimize memory usage
            df_query = store.select('query', columns=query_cols[0],
                                    where=where)
            add_query = False
        else:
            # load columns which are anyways needed already
            df_query = store.select('query', columns=get_cols, where=where)
            add_query = True

        return df_query, add_query

    def get(self, columns, where=None):
        """ Retrieve data from storage

        Parameters
        ----------
        columns: list/str
            list of columns to use, or use 'all' if all columns should be
            retrieved
        where: str
            a valid select statement

        Returns
        -------
        pandas.DataFrame
            with all requested columns and considering where
        """
        store = pd.HDFStore(str(self.path), mode='r')

        # get all columns in stored in HDFStorage
        stored_cols = self.stored_cols(store)

        if columns == 'all':
            columns = stored_cols

        # check if all desired columns can be found in storage
        if len(set(columns) - set(stored_cols)) > 0:
            store.close()
            raise ValueError('Column(s): {}. not in storage'.format(
                set(columns)- set(stored_cols)))

        # get all relevant groups (where columns are taken from)
        groups = self._find_groups(columns)

        # if where query is defined retrieve data from storage, eventually
        # only index of df_query might be used
        if where is not None:
            df_query, add_df_query = self._query_table(store, columns, where)
        else:
            df_query, add_df_query = None, False

        # dd collector
        df = list()
        for group in groups:
            # skip in case where was used and columns used from
            if where is not None and group=='query':
                continue
            # all columns which are in group but also requested
            get_cols = list(
                set(self.group_col_names(store, group)) & set(columns))

            tmp_df = store.select(group, columns=get_cols)
            if df_query is None:
                df.append(tmp_df)
            else:
                # align query index with df index from storage
                df_query, tmp_df = df_query.align(tmp_df, join='left', axis=0)
                df.append(tmp_df)

        store.close()

        # if any data of query should be added
        if add_df_query:
            df.append(df_query)

        # combine all columns
        df = pd.concat(df, axis=1)

        return df

    def update(self, df):
        """ Updates data in storage, all columns have to be stored already in
        order to be accepted for updating (use :meth:`.add_columns` instead)

        Parameters
        ----------
        df: pd.DataFrame
            with index as in storage, and column as desired


        Returns
        -------

        """
        store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel,
                            complib=self.complib)

        # check if all column have been stored already
        if df.columns.isin(self.stored_cols(store)).all() is False:
            store.close()
            raise ValueError('Some cols have not been stored yet')

        # find all groups needed to store the data
        groups = self._find_groups(df.columns)
        for group in groups:
            dc = None
            if group=='query':
                dc = True
            # load current disk data to memory
            group_df = store.get(group)
            # remove data from disk
            store.remove(group)
            # update with new data
            group_df.update(df)
            # save updated df back to disk
            store.append(group, group_df, data_columns=dc)

        store.close()


class DataGenerator():
    np.random.seed(1282)

    @staticmethod
    def get_df(rows=100, cols=10, freq='M'):
        """ Simulate data frame
        """
        if cols < 26:
            col_name = list(string.ascii_lowercase[:cols])
        else:
            col_name = range(cols)
        if rows > 2000:
            freq = 'Min'
        index = pd.date_range('19870825', periods=rows, freq=freq)
        df = pd.DataFrame(np.random.standard_normal((rows, cols)),
                          columns=col_name, index=index)
        df.index.name = 'date'
        df.columns.name = 'ID'
        return df

    @staticmethod
    def get_panel(rows=1000, cols=500, items=10):
        """ simulate panel data
        """

        if items < 26:
            item_names = list(string.ascii_lowercase[:cols])
        else:
            item_names = range(cols)
        panel_ = dict()

        for item in item_names:
            panel_[item] = DataGenerator.get_df(rows=rows, cols=cols)

        return pd.Panel(panel_)


def main():
    # Example of with DataFrame
    path = 'D:\\fc_storage.h5'
    groups_map = dict(
        a=dict(columns=['a', 'b', 'c', 'd', 'k']),
        query=dict(columns=['e', 'f', 'g', 'rank_a']),
    )
    storage = LargeDFStorage(path, groups_map=groups_map)
    df = DataGenerator.get_df(rows=200000, cols=15)
    storage.add_columns(df[['a', 'b', 'c', 'e', 'f']])
    storage.update(df[['a']]*3)
    storage.add_columns(df[['d', 'g']])

    print(storage.get(columns=['a','b', 'f'], where='f<0 & e<0'))

    # Example with panel and rank condition
    path2 = 'D:\\panel_storage.h5'
    storage_pnl = LargeDFStorage(path2, groups_map=groups_map)
    panel = DataGenerator.get_panel(rows=800, cols=2000, items=24)
    df = panel.to_frame()
    df['rank_a'] = df[['a']].groupby(level='date').rank()
    storage_pnl.add_columns(df[['a', 'b', 'c', 'e', 'f']])
    storage_pnl.update(df[['a']]*3)
    storage_pnl.add_columns(df[['d', 'g', 'rank_a']])
    print(storage_pnl.get(columns=['a','b','e', 'f', 'rank_a'],
                          where='f>0 & e>0 & rank_a <100'))


if __name__ == '__main__':
    main()
Community
  • 1
  • 1
MMCM_
  • 617
  • 5
  • 18

1 Answers1

2

It's bit difficult to answer those questions without particular examples...

Updating HDFTable, I suppose I have to delete the entire table in order to update a single column?

AFAIK yes unless you are storing single columns separately, but it will be done automatically, you just have to write your DF/Panel back to HDF Store.

When to use 'data_columns', or should I simply assign True in HDFStore.append()?

data_columns=True - will index all your columns - IMO it's waste of resources unless you are going to use all columns in the where parameter (i.e. if all columns should be indexed). I would specify there only those columns that will be used often for searching in where= clause. Consider those columns as indexed columns in a database table.

If I want to query based on condition of rank_var1 > 500 & rank_var1<1000, but I need columns from other groups. Can I enter the index received from the rank_var1 condition into the query to get other columns based on this index (the index is a multi-index with date and ID)?

I think we would need some reproducible sample data and examples of your queries in order to give a reasonable answer...

Copies of the storage might be used on win/ux systems. I assume it is perferctly compatible, anything to consider here?

Yes, it should be fully compatible

I plan to use pd.HDFStore(str(self.path), mode='a', complevel=9, complib='blosc'). Any concerns regarding complevel or complib?

Test it with your data - results might depend on dtypes, number of unique values, etc. You may also want to consider lzo complib - it might be faster in some use-cases. Check this. Sometimes a high complevel doesn't give you better copression ratio, but will be slower (see results of my old comparison)

Community
  • 1
  • 1
MaxU - stand with Ukraine
  • 205,989
  • 36
  • 386
  • 419
  • so I added a lengthy code example (sorry for that). Maybe the storage_pnl exampel shows what I mean with querying data conditioning on rank. If you are interested look at `.get()` method for the details. The `get()` used with `where` might not be the most effecient implementation in terms of memory management, but it performs reasonably fast on my hardware. – MMCM_ Oct 14 '16 at 18:41