0

I have the following utility class:

class RunningStatisticsVar:
    def __init__(self, ddof=0):
        self.mean = 0
        self.var = 0
        self.std = 0

        self._n = 0
        self._s = 0
        self._ddof = ddof

    def update(self, value):
        self._n += 1

        old_mean = self.mean
        self.mean += (value - old_mean) / self._n

        self._s += (value - old_mean) * (value - self.mean)
        self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
        self.std = np.sqrt(self.var)

This calculates and stores the running mean and std of a (long) stream of numbers. It works fine but, since I'm putting the class in my personal library, I'd like to make it robust to concurrent execution. For instance, I'd like to be able to do the following:

from joblib.parallel import Parallel, delayed

def execute_and_update(var):
    a = do_stuff()
    var.update(a)
    b, c = do_more_stuff()
    var.update(b)
    var.update(c)

stat = RunningStatisticsVar()
Parallel()(delayed(execute_and_update)(stat) for _ in range(1000))

and have the update calls be thread-safe.

Googling for this has given me many ways of executing code concurrently but I haven't found the way to make my class safe for concurrent execution. In Java, IIRC, this can be done with atomic methods/classes but I don't think Python has that.

UPDATE

Following the comment, I have updated my code, however I'm getting an error when trying to call my method from Parallel:

from joblib.parallel import Parallel, delayed
import numpy as np
from threading import Lock

class RunningStatisticsVar:
  def __init__(self, ddof=0):
    self.mean = 0
    self.var = 0
    self.std = 0

    self._n = 0
    self._s = 0
    self._ddof = ddof

    self._lock = Lock()

  def update(self, value):
    with self._lock:
      self._n += 1

      old_mean = self.mean
      self.mean += (value - old_mean) / self._n

      self._s += (value - old_mean) * (value - self.mean)
      self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
      self.std = np.sqrt(self.var)

samples = np.random.uniform(0, 100, [1000])
s1 = RunningStatisticsVar()
s2 = RunningStatisticsVar()

for i in samples:
  s1.update(i)
Parallel(n_jobs=-1)(delayed(lambda x: s2.update(x))(i) for i in samples) #

print(s1.mean, s1.std)
print(s2.mean, s2.std)

Attempting to run the above code gives me the following error in the line marked with #:

TypeError: can't pickle _thread.lock objects

Mate de Vita
  • 1,102
  • 12
  • 32
  • 1
    Use [lock-objects](https://docs.python.org/3/library/threading.html#lock-objects), on enter `def update(...` do `.aquire` on leave do `release`. – stovfl Oct 31 '19 at 13:21
  • Ah yes, that should work nicely, thank you. – Mate de Vita Oct 31 '19 at 14:59
  • I've tried to implement the solution but I'm getting a `TypeError`. I've updated the question with the new information and code. – Mate de Vita Oct 31 '19 at 15:40
  • I've changed the line to `from multiprocessing import Lock` and kept the rest the same, but now I get the error `RuntimeError: Lock objects should only be shared between processes through inheritance` – Mate de Vita Oct 31 '19 at 16:13
  • Read [how-to-create-a-synchronized-object-with-python-multiprocessing](https://stackoverflow.com/questions/5022493/how-to-create-a-synchronized-object-with-python-multiprocessing). Read also [multiprocessing-vs-threading-python](https://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python) – stovfl Oct 31 '19 at 16:16

0 Answers0