7

I am aware using the traditional multiprocessing library I can declare a value and share the state between processes.

https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#sharing-state-between-processes

When using the newer concurrent.futures library how can I share state between my processes?

import concurrent.futures

def get_user_object(batch):
    # do some work
    counter = counter + 1
    print(counter)

def do_multithreading(batches):
    with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:
        threadingResult = executor.map(get_user_object, batches)

def run():
    data_pools = get_data()
    start = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=PROCESSES) as executor:
        processResult = executor.map(do_multithreading, data_pools)
    end = time.time()
    print("TIME TAKEN:", end - start)

if __name__ == '__main__':
    run()

I want to keep a synchronized value of this counter.

In the previous library I might have used multiprocessing.Value and a Lock.

mkrieger1
  • 19,194
  • 5
  • 54
  • 65
Kay
  • 17,906
  • 63
  • 162
  • 270

1 Answers1

7

You can pass an initializer and initargs to ProcessPoolExecutor just as you would to multiprocessing.Pool. Here's an example:

import concurrent.futures
import multiprocessing as mp


def get_user_object(batch):
    with _COUNTER.get_lock():
        _COUNTER.value += 1
        print(_COUNTER.value, end=' ')


def init_globals(counter):
    global _COUNTER
    _COUNTER = counter


def main():
    counter = mp.Value('i', 0)
    with concurrent.futures.ProcessPoolExecutor(
        initializer=init_globals, initargs=(counter,)
    ) as executor:
        for _ in executor.map(get_user_object, range(10)):
            pass
    print()


if __name__ == "__main__":
    import sys
    sys.exit(main())

Use:

$ python3 glob_counter.py 
1 2 4 3 5 6 7 8 10 9 

Where:

  • for _ in executor.map(get_user_object, range(10)): lets you iterate over each result. In this case, get_user_object() returns None, so you don't really have anything to process; you just pass and take no further action.
  • The last print() call gives you an extra newline, because the original print() call does not use a newline (end=' '')
Brad Solomon
  • 38,521
  • 31
  • 149
  • 235