I have the following utility class:
class RunningStatisticsVar:
def __init__(self, ddof=0):
self.mean = 0
self.var = 0
self.std = 0
self._n = 0
self._s = 0
self._ddof = ddof
def update(self, value):
self._n += 1
old_mean = self.mean
self.mean += (value - old_mean) / self._n
self._s += (value - old_mean) * (value - self.mean)
self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
self.std = np.sqrt(self.var)
This calculates and stores the running mean and std of a (long) stream of numbers. It works fine but, since I'm putting the class in my personal library, I'd like to make it robust to concurrent execution. For instance, I'd like to be able to do the following:
from joblib.parallel import Parallel, delayed
def execute_and_update(var):
a = do_stuff()
var.update(a)
b, c = do_more_stuff()
var.update(b)
var.update(c)
stat = RunningStatisticsVar()
Parallel()(delayed(execute_and_update)(stat) for _ in range(1000))
and have the update
calls be thread-safe.
Googling for this has given me many ways of executing code concurrently but I haven't found the way to make my class safe for concurrent execution. In Java, IIRC, this can be done with atomic methods/classes but I don't think Python has that.
UPDATE
Following the comment, I have updated my code, however I'm getting an error when trying to call my method from Parallel:
from joblib.parallel import Parallel, delayed
import numpy as np
from threading import Lock
class RunningStatisticsVar:
def __init__(self, ddof=0):
self.mean = 0
self.var = 0
self.std = 0
self._n = 0
self._s = 0
self._ddof = ddof
self._lock = Lock()
def update(self, value):
with self._lock:
self._n += 1
old_mean = self.mean
self.mean += (value - old_mean) / self._n
self._s += (value - old_mean) * (value - self.mean)
self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
self.std = np.sqrt(self.var)
samples = np.random.uniform(0, 100, [1000])
s1 = RunningStatisticsVar()
s2 = RunningStatisticsVar()
for i in samples:
s1.update(i)
Parallel(n_jobs=-1)(delayed(lambda x: s2.update(x))(i) for i in samples) #
print(s1.mean, s1.std)
print(s2.mean, s2.std)
Attempting to run the above code gives me the following error in the line marked with #
:
TypeError: can't pickle _thread.lock objects