17

I have a multiprocessing program where

  • one process adds elements to a shared list (multiprocessing.Manager().list())
  • several other processes consume these elements from that list (and remove them); they run until there is something to process in the list and the process above is still adding to the list.

I implemented locking (via multiprocessing.Lock()) when adding to the list, or removing from it. Since there is one "feeder" process and several (10-40) "consumer" ones all competing for the lock, and that the consumer processes are fast, I end up with the "feeder" process having a hard time acquiring the lock.

Is there a concept of "priority" when acquiring a lock? I would like the "feeder" process to acquire it with more priority than the others.

Right now I mitigated the issue by having the "consumer" processes wait a random time before trying to acquire the lock while the "feeder" process is there (when it ends it sets a flag). This is a workaround which works but it is ugly and hardly effective (I have the processes wait random.random()*n seconds, where n is the number of processes. This is a completely made up number, probably wrong).

WoJ
  • 27,165
  • 48
  • 180
  • 345
  • how is the concept of a "queue" realised? Do you just append processes in a list when they come for the lock? – Ma0 Aug 31 '16 at 16:02
  • @Ev.Kounis: yes, this is a `multiprocessing.Manager().list()` as I mentioned in the question (this is not a `multiprocessing.Queue()`) – WoJ Aug 31 '16 at 16:06
  • i don't mean the list in which elements are added to and retrieved from, i mean the structure (list?) that tracks the priority of the processes – Ma0 Aug 31 '16 at 16:08
  • 6
    Out of curiosity, did you try using `multiprocessing.Queue` instead of implementing it? – Ami Tavory Aug 31 '16 at 17:50
  • 3
    There shouldn't be a race condition if you only have one feeder. Just `put` to a queue or `append` to a list. The consumers should `get` from the queue or `pop(0)` from the list. As per documentation, "Queues are thread and process safe." i.e. you don't need a lock on the feeder side, or from the consumer side either. You'd be better off using a `Queue`. If you want to keep using a `Manager().list` instead, because you are just popping from the list, not modifying values in it, I think you don't need locks either. Just let `multiprocessing` module do its job. – chapelo Aug 31 '16 at 20:09
  • @chapelo: thanks. I was under the impression that **any** activity on the `list` should be locked, i.e. that it was not safe to append one one side and pop on the other. If this is the case it exactly solves my problem. Would you mind turning your comment into an answer so that I can accept it? – WoJ Sep 01 '16 at 07:10
  • @AmiTavory: I am reassessing this - initially I thought I would be modifying the list in an unpredictable way (so it would need to be locked) but with the comments I will try to make it more straightforward and have immutable elements added (in the sense - not changed once they are in the queue) – WoJ Sep 01 '16 at 07:13
  • 3
    You need a ["reader writer lock"](https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock). You can implement it using two locks and a counter (read the link). – Bakuriu Dec 06 '16 at 18:29
  • Simple but not always correct solution is to acquire a lock by readers only when data counter is greater then 0. – Logman Jan 01 '17 at 22:45

3 Answers3

3

Make the Feeder's acquisition of the lock blocking, and the consumer's non-blocking.
So for the feeder:

try:
    with my_lock.acquire(): #locks block by default
        do stuff
finally:
    my_lock.release()

And the consumers:

while True:
   try:
      locked = my_lock.acquire(blocking=False)
      if locked:
         do stuff
   finally:
      if locked:
         my_lock.release()
   time.sleep(seconds=10)
guest
  • 31
  • 2
0

It is not perfect, but it must work:

In "feeder":

feeder_lock_object.lock()
consumer_lock_object.lock()
try:
    ...
finally:
    feeder_lock_object.release()
    consumer_lock_object.release()

In "consumer":

while True:
    with consumer_lock_object:
        if feeder_lock_object.is_locked:
            continue
        ...

But I think it will be better when you will be use Queue.

If you use this method, be careful about how you implement the lock object. You should initialize the pool with an initializer function that creates these lock objects as global params. Refer to this.

Harshith Thota
  • 856
  • 8
  • 20
ADR
  • 1,255
  • 9
  • 20
0

Instead of trying to change the locking priority, try changing the process priority itself such that feeder has a higher priority than the consumer. The workaround you used basically simulates this but with less efficiency.

To change process priority,

On Unix: use os.setpriority()

Refer the docs

On windows, use a third party module psutil.

Refer to this thread and Psutil Docs.

Harshith Thota
  • 856
  • 8
  • 20