7

Question presentation

I'm front of multiprocessing problematic. A large part of multiprocessing stack overflow questions are less complex than my situation and don't answer to it. Some people vote about possible duplicate with this question, but mine is different, in my situation, the shared DICT is modified between processes job:

I have a program who follow this simplified life cycle:

A. Initialize DATA dict
B. Initialize 4 subprocess workers
C. Execute code in each workers (worker massively read DATA dict)
D. Wait workers job is done
E. Modify DATA dict content
F. Go to C

Performance is an very important side of problem. I experimented many solutions with positive and negatives points:

Simple global dict (not working)

At step B the DICT variable is forked into sub processes environment. But after step E sub processes can't see changes.

Use multiprocessing.Manager dict

At step A dict is created with multiprocessing.Manager (see "Server process" here).

  • Pros: Easy to use
  • Cons: multiprocessing.Manager use serialization layer (i don't know much it, but it is able to work with processes on network), it is bad for performance.

Use multiple multiprocessing.Value and multiprocessing.Array instead a dict

multiprocessing.Value and multiprocessing.Array permit to use shared memory. I tried to replace my dict with several multiprocessing.Value and multiprocessing.Array like this:

With dict:

manager = multiprocessing.Manager()
dict = manager.dict()
dict['positions'] = [42, 165]
dict['on_position_42'] = 1555897
dict['on_position_165'] = 1548792

Replaced dict with multiprocessing.Value and multiprocessing.Array:

positions = multiprocessing.Array('i', [42, 165])
on_position_42 = multiprocessing.Value('i', 1555897)
on_position_165 = multiprocessing.Value('i', 1548792)

But at step E i will need to create new multiprocessing.Value and multiprocessing.Array, example:

positions.value = [42, 165, 322]
# create new multiprocessing.Value for 322
on_position_322 = multiprocessing.Value('i', 2258777)

Then at step C, on_position_322 will be unknown for workers. If i try to send multiprocessing.Value or multiprocessing.Array to sub processes through pipes it will result "Synchronized objects should only be shared between processes through inheritance" error.

  • Pros: Performance
  • Cons: How to "inform" sub process about existence of new multiprocessing.Value and multiprocessing.Array ?

Use memory database like memcache or redis

I know it's a possibility but i have to benchmark memory database versus multiprocessing.Manager dict.

  • Pros: Pragmatic and working
  • Cons: Performances ?

 Question conclusion

Does exist a way to use multiprocessing.Value and multiprocessing.Array in this life cycle, considering creation of new multiprocessing.Value and multiprocessing.Array ?

Or more generally, what will be the most perform strategy considering this life cycle ?

Note: I previously try an other strategy where step F is a "Go to B" (re-create new workers at each cycle). But forking environment of workers was too much long: Biggest was DICT longest was fork.

Community
  • 1
  • 1
bux
  • 7,087
  • 11
  • 45
  • 86
  • Are you using a Linux system? Do you join the previous processes? Do your dict keys change over time? or only its values? Note: It can be that the OS needs more work to apply the copy-on-write strategy when more memory is shared. – Roberto Trani Aug 30 '17 at 12:28
  • Hello @RobertoTrani Yes i use a Linux system (but it will be appreciated to be able to executed on Windows or OSX). Processes are joined, their job produce an exploited result. Dict keys can change yes. – bux Aug 30 '17 at 13:09
  • 1
    Possible duplicate of [Python multiprocessing: How do I share a dict among multiple processes?](https://stackoverflow.com/questions/6832554/python-multiprocessing-how-do-i-share-a-dict-among-multiple-processes) – stovfl Aug 30 '17 at 15:40
  • @stovfl Linked question as duplicate don't talk about dict modified by main process between sub process execution. It is a different problematic (Linked question selected answer don't answer to mine for example) – bux Aug 30 '17 at 18:44

1 Answers1

1

Since you're only reading from the dictionary and updating it in the main process you can use a JoinableQueue to pass the dictionary and wait for the workers to finish. E.g.

from multiprocessing import Process, JoinableQueue
import time

class Worker(Process):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue

    def run(self):
        for item in iter(self.queue.get, None):
            print item
            time.sleep(2)
            print 'done'
            self.queue.task_done()
        self.queue.task_done()

if __name__ == '__main__':
    request_queue = JoinableQueue()
    num_workers = 4
    workers = []
    d = {}  # A

    for _ in range(num_workers): 
        p = Worker(request_queue) # B
        workers.append(p)
        p.start()


    for i in range(5): # F
        for _ in range(num_workers):
            request_queue.put(d) # C
        request_queue.join()  # D
        d[i] = i  # E

    for w in workers:
        w.terminate()
        w.join()

Output:

{}
{}
{}
{}
done
done
done
done
{0: 0}
{0: 0}
{0: 0}
{0: 0}
done
done
done
done
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
done
done
done
done
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
done
done
done
done
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
done
done
done
done
slam08
  • 11
  • 2