I have a class for the computation of the running mean and standard deviation of a stream of numbers. Recently I made the effort to make this class safe for parallel updates both with multithreading and multiprocessing.
import numpy as np
class RunningStatsVariable:
def __init__(self, ddof=0, parallel=None):
self.mean = 0
self.var = 0
self.std = 0
self._n = 0
self._s = 0
self._ddof = ddof
if parallel == 'multiprocessing':
from multiprocessing import Lock
self._lock = Lock()
#from multiprocessing import Manager
#self._lock = Manager().Lock()
elif parallel == 'threading':
from threading import Lock
self._lock = Lock()
else:
self._lock = None
def update(self, values):
if self._lock:
self._lock.acquire()
values = np.array(values, ndmin=1)
n = len(values)
self._n += n
delta = values - self.mean
self.mean += (delta / self._n).sum()
self._s += (delta * (values - self.mean)).sum()
self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
self.std = np.sqrt(self.var)
if self._lock:
self._lock.release()
def update_single(self, value):
if self._lock:
self._lock.acquire()
self._n += 1
old_mean = self.mean
self.mean += (value - old_mean) / self._n
self._s += (value - old_mean) * (value - self.mean)
self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
self.std = np.sqrt(self.var)
if self._lock:
self._lock.release()
def __str__(self):
if self.std:
return f"(μ ± σ): {self.mean} ± {self.std}"
else:
return f"{self.name}: {self.mean}"
def __len__(self):
return self._n
if __name__ == '__main__':
data = [(np.random.random(), np.random.rand(20)) for _ in range(1000)]
def _update(var, value, arr):
var.update_single(value)
var.update(arr)
v1 = RunningStatsVariable()
for v, a in data:
_update(v1, v, a)
from joblib import Parallel, delayed
v2 = RunningStatsVariable(parallel='multiprocessing')
Parallel(n_jobs=-1)(
delayed(_update)(v2, v, a)
for v, a in data
)
v3 = RunningStatsVariable(parallel='threading')
Parallel(n_jobs=-1, prefer='threads')(
delayed(_update)(v3, v, a)
for v, a in data
)
print(np.allclose(v1.mean, [v2.mean, v3.mean]))
print(np.allclose(v1.var, [v2.var, v3.var]))
print(np.allclose(v1.std, [v2.std, v3.std]))
This code runs fine when I run it on Replit but when I try to run it locally I get the error
RuntimeError: Lock objects should only be shared between processes through inheritance
If I instead use the Manager
solution (commented out in the above code), which I found in a SO thread with this error message, the code executes but the multiprocessing variable isn't properly updated and just returns 0s at the end for mean
, std
, and var
.
Why can't I run the code locally? Is it to do with the OS? I'm running Windows on my local machine and don't have access to my Linux machine at the moment.
And what is the correct way to make my class safe for parallel updates?