1

A fairly common case for me is to have a periodic update of a value, say every 30 seconds. This value is available on, for instance, a website.

I want to take this value (using a reader), transform it (with a transformer) and publish the result, say on another website (with a publisher).

Both source and destination can be unavailable from time to time, and I'm only interested in new values and timeouts.

My current method is to use a queue for my values and another queue for my results. The reader, the transformer and the publisher are all separate 'threads' using multiprocessing.

This has the advantage that every step can be allowed to 'hang' for some time and the next step can use a get with a timeout to implement some default action in case there is no valid message in the queue.

The drawback of this method is that I'm left with all previous values and results in my queue once the transformer or publisher stalls. In the worst case the publisher has an unrecoverable error and the entire tool runs out of memory.

A possible resolution to this problem is to limit the queue size to 1, use a non-blocking put and handle a queue full exception by throwing away the current value and re-putting the new. This is quite a lot of code for such a simple action and a clear indication that a queue is not the right tool for the job.

I can write my own class to get the behavior I want using multiprocessing primitives, but this is a very common situation for me, so I assume it also is for others and I feel there should be a 'right' solution out there somewhere.

In short is there a standard threadsafe class with the following interface?

class Updatable():
    def put(value):
        #store value, overwriting existing

    def get(timeout):
        #blocking, raises Exception when timeout is set and exceeded
        return value

edit: my current implementation using multiprocessing

import multiprocessing
from time import sleep

class Updatable():
    def __init__(self):
        self.manager = multiprocessing.Manager()
        self.ns = self.manager.Namespace()
        self.updated = self.manager.Event()

    def get(self, timeout=None):
        self.updated.wait(timeout)
        self.updated.clear()
        return self.ns.x

    def put(self, item):
        self.ns.x = item
        self.updated.set()


def consumer(updatable):
    print(updatable.get())  # expect 1
    sleep(1)
    print(updatable.get())  # expect "2"
    sleep(1)
    print(updatable.get())  # expect {3}, after 2 sec
    sleep(1)
    print(updatable.get())  # expect [4]
    sleep(2)
    print(updatable.get())  # expect 6
    sleep(1)

def producer():
    sleep(.5)  # make output more stable, by giving both sides 0.5 sec to process
    updatable.put(1)
    sleep(1)
    updatable.put("2")
    sleep(2)
    updatable.put({3})
    sleep(1)
    updatable.put([4])
    sleep(1)
    updatable.put(5,)  # will never be consumed 
    sleep(1)
    updatable.put(6)

if __name__ == '__main__':
    updatable = Updatable()
    p = multiprocessing.Process(target=consumer, args=(updatable,))
    p.start()
    producer()
Pelle
  • 1,222
  • 13
  • 18
  • 1
    You say that you use threading with the multiprocessing module. Threading and multiprocessing are very different (in behavior) in Python, so wich is it? – CoMartel Aug 17 '16 at 14:54
  • I use multiprocessing as the implementation of the abstract threading concept. – Pelle Aug 18 '16 at 07:19
  • I should note that using multiprocessing is not a requirement for me, so if there is a better sulution using the threading module I'm also interested – Pelle Aug 18 '16 at 07:45
  • If you don't need multiprocessing, don't use it. The `Threading` module use almost the same syntax and is way more simple to use. I don't really understand why you would need a Queue, can't you just store the last value locally and handle exceptions in case it's unavailable? Could you provide an example code to help me understand? – CoMartel Aug 18 '16 at 08:51
  • To be honest, the reason for me to use multiprocessing over threading is the implementation of queue. Passing a dict over a queue.queue passes a reference, while passing a dict over a multiprocessing.queue passes a copy. Using this reference in two threads has cost me more time than I'm willing to admit. – Pelle Aug 18 '16 at 14:24
  • Why do you need that Queue? Can't you have 2 Threads reading and writing on the variable ? – CoMartel Aug 18 '16 at 14:46
  • Also what kind of data do you need to pass ? – CoMartel Aug 18 '16 at 14:59
  • Maybe I'm looking at this wrong and I should just be using a shared memory variable with a mutex and an event and make sure I put values in, not references. It just feels weird that there is no 'standard' solution for this. Data is just a bunch of variable (in a dict) for this project. – Pelle Aug 19 '16 at 08:04
  • if you need a dict, you should see this : http://stackoverflow.com/questions/6832554/python-multiprocessing-how-do-i-share-a-dict-among-multiple-processes – CoMartel Aug 22 '16 at 07:02

0 Answers0