There seem to be two main bottlenecks here:
Pandas DataFrames store their data in column-major format, meaning each column maps to one numpy array, whereas the Redis stream data is row-by-row.
Pandas MultiIndex is made for categorical data, and converting raw arrays to the required levels/code structure seems to be non-optimized
Due to number 1. it is inevitable to loop over all Redis stream entries. Assuming we know the length beforehand, we can pre-allocate numpy arrays that we fill as we go along, and with some tricks reuse these arrays as the DataFrame columns. If the overhead of looping in Python is still too much, rewriting in Cython should be straightforward.
Since you didn't specify datatypes, the answer keeps everything in bytes using numpy.object arrays, it should be reasonably obvious how to adapt to a custom setting. The only reason to put all of the columns in the same array is to move an inner loop over the columns/fields from Python to C. It can be split up into e.g. one array per data type or one array per column.
from functools import partial, reduce
import numpy as np
import pandas as pd
data = [[b'1554900384437-0', [b'foo', b'1', b'bar', b'2', b'bla', b'abc']],
[b'1554900414434-0', [b'foo', b'3', b'bar', b'4', b'bla', b'xyz']]]
colnames = data[0][1][0::2]
ncols = len(colnames)
nrows = len(data)
ts_seq = np.empty((2, nrows), dtype=np.int64)
cols = np.empty((ncols, nrows), dtype=np.object)
for i,(id,fields) in enumerate(data):
ts, seq = id.split(b"-", 2)
ts_seq[:, i] = (int(ts), int(seq))
cols[:, i] = fields[1::2]
colframes = [pd.DataFrame(cols[i:i+1, :].T) for i in range(ncols)]
merge = partial(pd.merge, left_index=True, right_index=True, copy=False)
df = reduce(merge, colframes[1:], colframes[0])
df.columns = colnames
For number 2. we can use numpy.unique
to create the levels/codes structure needed by Pandas MultiIndex. From the documentation it seems that numpy.unique
also sorts the data. Since our data is presumably already sorted, a possible future optimisation would be to try to skip the sorting step.
ts = ts_seq[0, :]
seq = ts_seq[1, :]
maxseq = np.max(seq)
ts_levels, ts_codes = np.unique(ts, return_inverse=True)
seq_levels = np.arange(maxseq+1)
seq_codes = seq
df.index = pd.MultiIndex(levels=[ts_levels, seq_levels], codes=[ts_codes, seq_codes], names=["Timestamp", "Seq"])
Finally, we can verify that there was no copying involved by doing
cols[0, 0] = b'79'
and checking that the entries in df
do indeed change.