5

What would be the fastest way to convert a Redis Stream output (aioredis client/ hiredis parser) to a Pandas Dataframe where Redis Stream ID‘s timestamp and sequence number as well as values are proper type converted Pandas index columns?

Example Redis output:

[[b'1554900384437-0', [b'key', b'1']], 
[b'1554900414434-0', [b'key', b'1']]]
Eugene Yarmash
  • 142,882
  • 41
  • 325
  • 378
trbck
  • 5,187
  • 6
  • 26
  • 29
  • do you intend to keep on appending to the dataframe as and when there is new entry comes from the stream? – Aditya Santoso Apr 11 '19 at 02:49
  • No. Download it once and create a Dataframe from above structure as fast as possible. But aping scenario also sounds interesting. – trbck Apr 11 '19 at 05:36
  • 1
    Maybe cache the stream in a simple data structure (like, list) and create a dataframe then? Appending to existing dataframe is slow. – knh190 Apr 13 '19 at 00:48
  • Above output is already a list of non converted byte strings which is intercepted from here https://github.com/aio-libs/aioredis/blob/master/aioredis/commands/streams.py#L39 and coming directly from the redis parser ("raw output). Ignoring the appending topic I want to find an efficient way to convert this result into a type converted dataframe. One could argue that I should rephrase this question into "convert nested list of byte strings into a dataframe" but I still think that giving the bigger picture of the newly introduced redis streams data type makes sense. – trbck Apr 13 '19 at 10:11

3 Answers3

4

There seem to be two main bottlenecks here:

  1. Pandas DataFrames store their data in column-major format, meaning each column maps to one numpy array, whereas the Redis stream data is row-by-row.

  2. Pandas MultiIndex is made for categorical data, and converting raw arrays to the required levels/code structure seems to be non-optimized

Due to number 1. it is inevitable to loop over all Redis stream entries. Assuming we know the length beforehand, we can pre-allocate numpy arrays that we fill as we go along, and with some tricks reuse these arrays as the DataFrame columns. If the overhead of looping in Python is still too much, rewriting in Cython should be straightforward.

Since you didn't specify datatypes, the answer keeps everything in bytes using numpy.object arrays, it should be reasonably obvious how to adapt to a custom setting. The only reason to put all of the columns in the same array is to move an inner loop over the columns/fields from Python to C. It can be split up into e.g. one array per data type or one array per column.

from functools import partial, reduce
import numpy as np
import pandas as pd
data = [[b'1554900384437-0', [b'foo', b'1', b'bar', b'2', b'bla', b'abc']], 
[b'1554900414434-0', [b'foo', b'3', b'bar', b'4', b'bla', b'xyz']]]
colnames = data[0][1][0::2]
ncols = len(colnames)
nrows = len(data)
ts_seq = np.empty((2, nrows), dtype=np.int64)
cols = np.empty((ncols, nrows), dtype=np.object)

for i,(id,fields) in enumerate(data):
    ts, seq = id.split(b"-", 2)
    ts_seq[:, i] = (int(ts), int(seq))
    cols[:, i] = fields[1::2]

colframes = [pd.DataFrame(cols[i:i+1, :].T) for i in range(ncols)]
merge = partial(pd.merge, left_index=True, right_index=True, copy=False)
df = reduce(merge, colframes[1:], colframes[0])
df.columns = colnames

For number 2. we can use numpy.unique to create the levels/codes structure needed by Pandas MultiIndex. From the documentation it seems that numpy.unique also sorts the data. Since our data is presumably already sorted, a possible future optimisation would be to try to skip the sorting step.

ts = ts_seq[0, :]
seq = ts_seq[1, :]
maxseq = np.max(seq)
ts_levels, ts_codes = np.unique(ts, return_inverse=True)
seq_levels = np.arange(maxseq+1)
seq_codes = seq
df.index = pd.MultiIndex(levels=[ts_levels, seq_levels], codes=[ts_codes, seq_codes], names=["Timestamp", "Seq"])

Finally, we can verify that there was no copying involved by doing

cols[0, 0] = b'79'

and checking that the entries in df do indeed change.

BookYourLuck
  • 330
  • 1
  • 6
  • Nice! Thanks you – trbck Apr 14 '19 at 08:47
  • Another idea to skip Multiindex operations (deviates from my original question) might be to skip the sequence field and just copy the timestamp as another col to the Dataframe, let pandas set an integer row-count index itself and worry about doubled timestamps later.. – trbck Apr 14 '19 at 08:56
1

The quickest way is to process data using batches

  1. IO in batches of N msgs (i.e. 100 messages per batch)

  2. Convert this batch into 1 Dataframe (using pd.DataFrame([]))

  3. Apply lambda or convertation function to timestamp column converted to numpy (.values). a-la:

    df['time'] = [datetime.fromtimestamp(t.split('-')[0]) for t in df['time'].values]

Pavel Kovtun
  • 367
  • 2
  • 8
  • Thanks, I think this can be done better right in the hiredis parser, see my question here: https://github.com/redis/hiredis-py/issues/85 - if I get a reply I will post the solution here. – trbck Apr 13 '19 at 17:30
0

you can use this:

pd.read_msgpack(redisConn.get("key"))
Ali Hallaji
  • 3,712
  • 2
  • 29
  • 36
  • Should only work if I use redis set alike commands and already convert a dataframe to msgpack before setting it in redis. But question relates to redis streams which then would be not applicable I guess – trbck Apr 10 '19 at 08:57
  • Please add some explanatory comments to your answer. [From Review](https://stackoverflow.com/review/low-quality-posts/22710333). – Wai Ha Lee Apr 10 '19 at 08:57
  • @WaiHaLee I see no need for this as I am refering to redis streams which yield this kind of output. – trbck Apr 13 '19 at 10:07
  • pd.read_msgpack: Deprecated since version 0.25.0. – Rishabh Gupta Aug 29 '23 at 09:12