While I didn't manage to find a rolling implementation of PCA, it is a relatively straightforward matter to use the packages and tools mentioned in the question to code a manual rolling PCA function. In addition, we will use numba to gain a small speed-up, as it supports numpy.linalg.svd and numpy.linalg.eig.
The code in this answer is inspired by the excellent explanations of PCA here and here
import numpy as np
from numpy.linalg import eig
from numba import njit
import numpy.typing as npt
@njit
def rolling_pca(
arr: npt.NDArray[np.float64],
n_components: int,
window: int,
min_periods: int
) -> npt.NDArray[np.float64]:
"""Perform PCA on the covariance matrix of arr.
Return the lower dimensional array.
Data is assumed to have non-zero mean, so will be demeaned
in the process.
Args:
arr: Input data. Shape (n_samples, n_variables).
n_components: Number of components to reduce data matrix to.
Must be less than arr.shape[1].
window: Sliding window size.
min_periods: Minimum number of observations required to perform calculation.
Returns:
Reduced data matrix. Shape (n_samples, n_components)
"""
# create a copy to ensure we don't change data in place
arr_copy = arr.copy()
n = arr_copy.shape[0]
# create an empty array which will be populated with the output
reduced_out = np.full((n, n_components), np.nan)
# iterate over each row (timestamp in a timeseries)
for i in range(min_periods, n + 1):
if i < window:
lookback = i
else:
lookback = window
start_idx = i - lookback
curr_arr = arr_copy[start_idx: i, :]
# demean returns
curr_arr = curr_arr - (np.sum(curr_arr, axis=0) / lookback)
# calculate the covariance matrix
cov = (curr_arr.T @ curr_arr) / (lookback - 1)
# get the eigenvectors
# sort eigvals to get top largest corresponding eigenvectors
evals, evecs = eig(cov)
idx = np.argsort(evals)[:n_components]
evecs = evecs[:, idx]
# multiply the top eigenvectors by the current array to get a reduced matrix
reduced = (evecs.T @ curr_arr.T).T
reduced_out[start_idx: i, :] = reduced
return reduced_out
After profiling the code, the two slowest parts are, as expected, the calls to eig()
and the matrix multiplication curr_arr.T @ curr_arr
. As the array curr_arr
is limited by the window size, a pure numpy (no numba) implementation of matrix multiplcation is faster than using numba. This is because the arrays used in the matrix multiplication are small, and are not contiguous (see this post for more details). I didn't get around to resolving this issue, but if anyone has any suggestions, it would speed up this function quite a bit more.
I've compared the average timings between 3 implementations to see the effect of the speedup that numba offers. The 3 implementations are:
- Manual for loop using numba, exactly as above
- Manual for loop without numba, but otherwise same as the code above
- Manual for loop using Sklearn instead of numpy
eig
, no numba (as numba does not support Sklearn)
Note that the following parameters are fixed so we get as fair a comparison as possible between implementations
- Window size = 120
- Minimum number of periods = 22
- Input data number of variables = 20
- Number of components to reduce to via PCA = 10
- Number of iterations to time function over so as to get an average timing per implementation = 10
Only the row size (number of samples) is allowed to vary so we can visualise how execution time varies with array length.

We can see for large arrays (100k rows), we can decrease the time from about 14.19s using Sklearn, to 5.36s using numba, about a 2.6X speedup.
PCA Reconstruction
We implement some code similar to what we used above to reconstruct the original data matrix, using only the top principle components. Namely, we use SVD to decompose the matrix X
into 3 matrices, U
, S
, and V^T
. With these matrices, we can calculate how much variance is kept cumulatively by the components, and only keep the top k
components that explain a desired amount of variance.
import numpy as np
from numpy.linalg import svd
from numba import njit
import numpy.typing as npt
@njit
def __get_num_top_components(
singular_values: npt.NDArray[np.float64],
threshold: float,
n: int,
) -> int:
"""Get the number of top eigen-components required by threshold.
Args:
singular_values: Singular values from SVD.
threshold: Minimum amount of explained variance to be kept.
n: Number of samples in data matrix.
Returns:
Required number of components to keep.
"""
evals = singular_values ** 2 / (n - 1)
evals = evals / np.sum(evals)
cumsum_evals = np.cumsum(evals)
top_k = np.argwhere(cumsum_evals > threshold).min()
return top_k
@njit
def rolling_pca_reconstruction(
arr: npt.NDArray[np.float64],
threshold: float,
window: int,
min_periods: int
) -> npt.NDArray[np.float64]:
"""Perform PCA on arr and return reconstructed matrix.
This method follows the logic succinctly outlined here:
https://stats.stackexchange.com/a/134283/178320
Args:
arr: Input data. Shape (n_samples, n_variables).
threshold: Minimum amount of explained variance to be kept.
Must be a number in (0., 1.).
window: Sliding window size.
min_periods: Minimum number of observations required to perform calculation.
Returns:
Reconstructed data matrix. Shape (n_samples, n_variables)
"""
arr_copy = arr.copy()
n = arr_copy.shape[0]
p = arr_copy.shape[1]
recon_out = np.full((n, p), np.nan)
for i in range(min_periods, n + 1):
if i < window:
lookback = i
else:
lookback = window
start_idx = i - lookback
curr_arr = arr_copy[start_idx: i, :]
# demean data
curr_arr = curr_arr - (np.sum(curr_arr, axis=0) / lookback)
# perform SVD on data, no need for full matrices, this is faster
u, s, vh = svd(curr_arr, full_matrices=False)
# calculate the number of components that explains threshold variance
top_k = __get_num_top_components(
singular_values=s,
threshold=threshold,
n=lookback,
)
# reconstruct the data matrix using the top_k components
tmp_recon = u[:, :top_k] @ np.diag(s)[:top_k, :top_k] @ vh[:, :top_k].T
recon_out[start_idx: i, :] = tmp_recon
return recon_out
The output of rolling_pca_reconstruction()
is the reconstructed data, of same dimension as the input data arr
. One useful modification that could be made to this code is to record top_k
at each iteration, to understand how many components are needed to explain treshold
variance over time.