1

the following code works fine -

import time
from concurrent.futures import Future, as_completed
from concurrent.futures.process import ProcessPoolExecutor
import multiprocessing as mp
from multiprocessing.managers import BaseManager


class Progress(object):

    _target: int = 0
    _progress: int = 0

    def __init__(self):
        self._target = 0
        self._progress = 0

    def completed(self, n):
        self._progress += n

    def progress(self):
        return (self._progress/self._target) * 100

    def set_target(self, n):
        self._target = n


class ObjectManager(BaseManager):
    pass


def dummy_worker(progress: Progress, cancel: mp.Event = None):
    print("--> Worker started")
    for i in range(10):
        time.sleep(1)
        progress.completed(1)
    return 1


if __name__ == "__main__":
    ObjectManager.register('Progress', Progress)
    print('Starting manager')
    with ObjectManager() as manager:
        print('Manager started')
        progress = manager.Progress()
        progress.set_target(10)
        with ProcessPoolExecutor() as pool:
            f = pool.submit(dummy_worker, progress)

            futures = [f]
            for f in as_completed(futures):
                print(f.result())
                print(f'Progress: {progress.progress()}')

while the following gives me Condition objects should only be shared between processes through inheritance. i am not sure how to use Event in dummy_worker. the main goal is to pass some shared object and also have a way to cancel the worker. sorry for the long post, i wanted to give full code for clarity.

import time
from concurrent.futures import Future, as_completed
from concurrent.futures.process import ProcessPoolExecutor
import multiprocessing as mp
from multiprocessing.managers import BaseManager


class Progress(object):

    _target: int = 0
    _progress: int = 0

    def __init__(self):
        self._target = 0
        self._progress = 0

    def completed(self, n):
        self._progress += n

    def progress(self):
        return (self._progress/self._target) * 100

    def set_target(self, n):
        self._target = n


class ObjectManager(BaseManager):
    pass


def dummy_worker(progress: Progress, cancel: mp.Event = None):
    print("--> Worker started")
    for i in range(10):
        time.sleep(1)
        progress.completed(1)
    return 1


if __name__ == "__main__":
    ObjectManager.register('Progress', Progress)
    cancel = mp.Event()
    print('Starting manager')
    with ObjectManager() as manager:
        print('Manager started')
        progress = manager.Progress()
        progress.set_target(10)
        with ProcessPoolExecutor() as pool:
            f = pool.submit(dummy_worker, progress, cancel)

            futures = [f]
            for f in as_completed(futures):
                print(f.result())
                print(f'Progress: {progress.progress()}')

assume i am using python 3.5+.

  • Probably related to this: https://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes - i.e. because they can't be pickled which is needed to be passed as a parameter. – DisappointedByUnaccountableMod Sep 07 '20 at 09:45

1 Answers1

2

Try the following changes:

from multiprocessing.managers import SyncManager


class ObjectManager(SyncManager):
    pass

# use an Event() created by ObjectManager instance: cancel = manager.Event()

if __name__ == "__main__":
    ObjectManager.register('Progress', Progress)
    #cancel = mp.Event() # not this
    print('Starting manager')
    with ObjectManager() as manager:
        print('Manager started')
        progress = manager.Progress()
        cancel = manager.Event() # but rather this
        progress.set_target(10)
        with ProcessPoolExecutor() as pool:
            f = pool.submit(dummy_worker, progress, cancel)

            futures = [f]
            for f in as_completed(futures):
                print(f.result())
                print(f'Progress: {progress.progress()}')

Prints:

Starting manager
Manager started
--> Worker started
1
Progress: 100.0

Also, get rid of/change references to mp.Event.

Booboo
  • 38,656
  • 3
  • 37
  • 60