pandas 1.5.1
numpy 1.23.4
numba 0.57.1
python 3.11.0
Rolling window aggregation
Without asking the meaning of what exactly you are trying to do, I'll try to answer purely on execution technique.
So, let's prepare some data to work with. I'm gonna assume that you have a sequence of symbols, e.g. a capital Latin letter, indexed by timestamps with a predefined frequency, e.g. 3 hours. The last assumption will matter at the very end when we get rid of the index to work exclusively in Numpy. As for symbols, I'm assuming they are categorical values which can be replaced by numbers in a fixed range.
import pandas as pd
import numpy as np
from numba import njit
from scipy.spatial.distance import euclidean
from string import ascii_uppercase
from timeit import Timer
from numpy.lib.stride_tricks import as_strided
from numpy.random import default_rng
rng = default_rng(seed=0)
total = 3*8*14*10 # 8x3hours(1day) * 14days * 10times
df = pd.DataFrame({
'timestamp': pd.date_range('2023', periods=total, freq='3H')
, 'label': rng.choice([*ascii_uppercase], size=total)
})
Also, I hope you will forgive me for allowing myself to use other names:
get_label_array
I call nfrequent
calculate_rolling_dist
I call rolling_distance
top_N
I call just n
Pandas' rolling windows
We can simplify the first function returning a limited frequency table with pandas.Series.value_counts, note the normalize=True
parameter. Also, I'm delegating Numpy and Pandas as much iterations as possible:
def nfrequent(seq, n):
'''get n most frequent items in discending order
with normalized frequency and all the rest at the end'''
vc = seq.value_counts(normalize=True) # ascending=False by default
return pd.concat([
vc.iloc[:n],
pd.Series({'rest': vc.iloc[n:].sum()})
])
def rolling_distance(seq, window='14D', n=10):
total_nfreq = nfrequent(seq, n)
n = len(total_nfreq) - 1 # in case if total_nfreq is short
return seq.map(ord).rolling(window).agg(
lambda x: (
euclidean(total_nfreq, window_nfreq)
if len((window_nfreq:=nfrequent(x, n))) > n
else np.nan
)
)
Notes on the code:
- A function should assume that all dates have already been converted to
datetime
, because in general we can't predict the format of the date passed as a string.
- We can assume as well that data passed to the second function are a
Series
of symbols with timestamps as indexes.
- I think a default value of a parameter should be declared once in an external function to avoid confusion. You define
top_N=10
in both, which is a source of headaches in the future.
- I used
seq.map(ord)
to make data aggregation with a custom function work on rolling windows. Otherwise we need to iterate over windows in the outer loop.
Let's note the speed on the test data:
seq = df.set_index('timestamp').squeeze()
ans_pd = rolling_distance(seq) # use this to check with other results
print(Timer(lambda: rolling_distance(seq)).autorange())
On my hardware, the task took 3.47 seconds to complete. It looks too much. The outer for-loop is even less productive (3.97 sec). So let's see for other options.
A little more Numpy
Let's try to minimize Pandas in the nfrequent
, and get rid of if-else statement in rolling aggregation. Also, I decided that seq.map(ord)
is better out the rolling_distance
function:
def nfrequent(seq, n, count_unique=26): # 26 is len(string.asci_uppercase)
'''get n most frequent items in discending order, n < count_unique
with normalized frequency and all the rest at the end'''
counts = np.bincount(seq, minlength=count_unique)
counts[::-1].sort() # sort inplace in reversed order
counts[n] = counts[n:].sum()
return counts[:n+1] / counts[:n+1].sum()
def rolling_distance(seq, window='14D', n=10):
total_nfreq = nfrequent(seq, n)
return seq.rolling(window).agg(
lambda window: euclidean(total_nfreq, nfrequent(window, n))
)
Check the performance and save the result:
seq = df.set_index('timestamp').squeeze().map(ord)
ans_np = rolling_distance(seq)
print(Timer(lambda: rolling_distance(seq)).autorange())
The performance on my side is much better this time, 358 ms against 3.5 sec previously. Can Numba help? Not the right time. There will be problems with pandas.Series
, and because of the hassle of converting data to numpy.ndarray
, performance will be degraded. Let's figure out another way to create rolling windows without Pandas.
Numpy tricks with slicing
What if we consider a sequence of labels as a two-dimensional array? Consider it as follows:
- the first index of this array is the window number;
- the second is the number of the element in the window.
We can accomplish this with numpy.lib.stride_tricks.as_strided. For this purpose, we need data with a certain frequency so that all windows of a given time interval have the same number of labels. As an additional advantage, we can now to use step to low down the number of windows to loop over:
def nfrequent(seq, n, count_unique=26): # 26 is len(string.asci_uppercase)
'''get n most frequent items in discending order, n < count_unique
with normalized frequency and all the rest at the end'''
#counts = np.zeros(count_unique)
counts = np.bincount(seq, minlength=count_unique)
counts[::-1].sort() # sort inplace in reversed order
counts[n] = counts[n:].sum()
return counts[:n+1] / counts[:n+1].sum()
def rolling_distance(arr, window, step=1, n=10):
windows = strided_view(seq, window)
total_nfreq = nfrequent(seq, n)
distance = np.empty(windows.shape[0] ,dtype=float)
for i in range(distance.size):
distance[i] = euclidean(total_nfreq, nfrequent(windows[i], n))
return distance
def strided_view(arr, window, step=1):
n = arr.size
itemsize = arr.itemsize
ksteps = 1 + (n - window) // step
return as_strided(
arr
, shape=(ksteps, window)
, strides=(itemsize*step, itemsize)
, writeable=False
)
Now we prepare data as a numpy array and parameters of a sliding window as integer numbers (i.e. we transform strings like '14D'
into a number of labels to take like one window):
# map the labels by numbers from 0 to 25
seq = df.set_index('timestamp').squeeze().map(ord) - ord('A')
# save the timestamps in index to restore lately if needed
index, seq = seq.index, seq.to_numpy()
freq = pd.Timedelta('3H')
interval = pd.Timedelta('14D')
window = interval // freq # number of items in a window
ans_strided = rolling_distance(seq, window) # save the data to compare
print(Timer(lambda: rolling_distance(seq, window))).autorange())
This time I see the performance was somewhat better, 120 ms against 358 ms and 3.5 sec in the previous times.
Numpy stryde tricks and Numba
The code above can be adapted to run it with Numba:
@njit
def nfrequent(seq, n, count_unique=26): # 26 is len(string.asci_uppercase)
'''get n most frequent items in discending order, n < count_unique
with normalized frequency and all the rest at the end'''
#counts = np.zeros(count_unique)
counts = np.bincount(seq, minlength=count_unique)
counts[::-1].sort() # sort inplace in reversed order
counts[n] = counts[n:].sum()
return counts[:n+1] / counts[:n+1].sum()
@njit
def rolling_distance(windows, n=10): # pass strided windows as a parameter
total_nfreq = nfrequent(seq, n)
distance = np.empty(windows.shape[0], dtype=float)
for i in range(distance.size):
delta = total_nfreq - nfrequent(windows[i], n)
distance[i] = np.sqrt(delta @ delta)
return distance
def strided_view(arr, width, step=1):
n = arr.size
itemsize = arr.itemsize
ksteps = 1 + (n - width) // step
return as_strided(
arr
, shape=(ksteps, width)
, strides=(itemsize*step, itemsize)
, writeable=False
)
# prepare data, collect result, check performance
seq = df.set_index('timestamp').squeeze().map(ord) - ord('A')
index, seq = seq.index, seq.to_numpy()
# we have the test data with a frequency 3 hours
# and the time interval of interest is 14 days
freq = pd.Timedelta('3H')
interval = pd.Timedelta('14D')
width = interval // freq
windows = strided_view(seq, width)
ans_numba_strided = rolling_distance(windows)
print(Timer(lambda: rolling_distance(windows))).autorange())
NOTE: I couldn't use euclidean
with Numba in nopython mode so replaced it with a direct analogy
This time I've got 4.76 ms on the test data, which is quite better then all the others.
Uneven distribution of data over time
If the data is unevenly distributed over time, we might try to classify them into time bins, e.g.:
# transform seq in a sequence of numbers in range(26)
seq = df.set_index('timestamp').squeeze().map(ord) - ord('A')
timebin = pd.Timedelta('6H')
start, stop = seq.index[[0, -1]]
nbins = (2*(stop-start)+timebin) // (2*timebin) # cover all the data
nlabels = len(ascii_uppercase)
data = np.zeros((nbins, nlabels), dtype=float)
for i in range(nbins):
data[i, :] = np.bincount(
seq[start+i*timebin : start+(i+1)*timebin]
, minlength=26
)
Now we can slide along bins, i.e. first index of data, with a fixed window shape (bins_in_interval, num_of_letters)
to collect distributions of letters over given period.