0

I have a class for the computation of the running mean and standard deviation of a stream of numbers. Recently I made the effort to make this class safe for parallel updates both with multithreading and multiprocessing.

import numpy as np


class RunningStatsVariable:
  def __init__(self, ddof=0, parallel=None):
    self.mean = 0
    self.var = 0
    self.std = 0

    self._n = 0
    self._s = 0
    self._ddof = ddof

    if parallel == 'multiprocessing':
      from multiprocessing import Lock
      self._lock = Lock()
      #from multiprocessing import Manager
      #self._lock = Manager().Lock()
    elif parallel == 'threading':
      from threading import Lock
      self._lock = Lock()
    else:
      self._lock = None

  def update(self, values):
    if self._lock:
      self._lock.acquire()

    values = np.array(values, ndmin=1)
    n = len(values)

    self._n += n

    delta = values - self.mean
    self.mean += (delta / self._n).sum()

    self._s += (delta * (values - self.mean)).sum()
    self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
    self.std = np.sqrt(self.var)

    if self._lock:
      self._lock.release()

  def update_single(self, value):
    if self._lock:
      self._lock.acquire()

    self._n += 1

    old_mean = self.mean
    self.mean += (value - old_mean) / self._n

    self._s += (value - old_mean) * (value - self.mean)
    self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
    self.std = np.sqrt(self.var)

    if self._lock:
      self._lock.release()

  def __str__(self):
    if self.std:
      return f"(μ ± σ): {self.mean} ± {self.std}"
    else:
      return f"{self.name}: {self.mean}"

  def __len__(self):
    return self._n


if __name__ == '__main__':
  data = [(np.random.random(), np.random.rand(20)) for _ in range(1000)]

  def _update(var, value, arr):
    var.update_single(value)
    var.update(arr)

  v1 = RunningStatsVariable()
  for v, a in data:
    _update(v1, v, a)

  from joblib import Parallel, delayed

  v2 = RunningStatsVariable(parallel='multiprocessing')
  Parallel(n_jobs=-1)(
    delayed(_update)(v2, v, a)
    for v, a in data
  )

  v3 = RunningStatsVariable(parallel='threading')
  Parallel(n_jobs=-1, prefer='threads')(
    delayed(_update)(v3, v, a)
    for v, a in data
  )

  print(np.allclose(v1.mean, [v2.mean, v3.mean]))
  print(np.allclose(v1.var, [v2.var, v3.var]))
  print(np.allclose(v1.std, [v2.std, v3.std]))

This code runs fine when I run it on Replit but when I try to run it locally I get the error

RuntimeError: Lock objects should only be shared between processes through inheritance

If I instead use the Manager solution (commented out in the above code), which I found in a SO thread with this error message, the code executes but the multiprocessing variable isn't properly updated and just returns 0s at the end for mean, std, and var.

Why can't I run the code locally? Is it to do with the OS? I'm running Windows on my local machine and don't have access to my Linux machine at the moment.

And what is the correct way to make my class safe for parallel updates?

Mate de Vita
  • 1,102
  • 12
  • 32
  • What are the details of the Python setups on Replit and your local machine, such as the versions of Python and the modules used in each setup? – jjramsey Aug 06 '21 at 16:44
  • This is a "spawn" vs "Fork" error. I haven't combed through your code to find where, but what you describe very much suggests REPLIT is using a linux server to execute the code where "fork" will be the default startmethod and on your windows machine "spawn" is the only available startmethod – Aaron Aug 06 '21 at 17:49

0 Answers0