A fairly common case for me is to have a periodic update of a value, say every 30 seconds. This value is available on, for instance, a website.
I want to take this value (using a reader), transform it (with a transformer) and publish the result, say on another website (with a publisher).
Both source and destination can be unavailable from time to time, and I'm only interested in new values and timeouts.
My current method is to use a queue for my values and another queue for my results. The reader, the transformer and the publisher are all separate 'threads' using multiprocessing.
This has the advantage that every step can be allowed to 'hang' for some time and the next step can use a get with a timeout to implement some default action in case there is no valid message in the queue.
The drawback of this method is that I'm left with all previous values and results in my queue once the transformer or publisher stalls. In the worst case the publisher has an unrecoverable error and the entire tool runs out of memory.
A possible resolution to this problem is to limit the queue size to 1, use a non-blocking put and handle a queue full exception by throwing away the current value and re-putting the new. This is quite a lot of code for such a simple action and a clear indication that a queue is not the right tool for the job.
I can write my own class to get the behavior I want using multiprocessing primitives, but this is a very common situation for me, so I assume it also is for others and I feel there should be a 'right' solution out there somewhere.
In short is there a standard threadsafe class with the following interface?
class Updatable():
def put(value):
#store value, overwriting existing
def get(timeout):
#blocking, raises Exception when timeout is set and exceeded
return value
edit: my current implementation using multiprocessing
import multiprocessing
from time import sleep
class Updatable():
def __init__(self):
self.manager = multiprocessing.Manager()
self.ns = self.manager.Namespace()
self.updated = self.manager.Event()
def get(self, timeout=None):
self.updated.wait(timeout)
self.updated.clear()
return self.ns.x
def put(self, item):
self.ns.x = item
self.updated.set()
def consumer(updatable):
print(updatable.get()) # expect 1
sleep(1)
print(updatable.get()) # expect "2"
sleep(1)
print(updatable.get()) # expect {3}, after 2 sec
sleep(1)
print(updatable.get()) # expect [4]
sleep(2)
print(updatable.get()) # expect 6
sleep(1)
def producer():
sleep(.5) # make output more stable, by giving both sides 0.5 sec to process
updatable.put(1)
sleep(1)
updatable.put("2")
sleep(2)
updatable.put({3})
sleep(1)
updatable.put([4])
sleep(1)
updatable.put(5,) # will never be consumed
sleep(1)
updatable.put(6)
if __name__ == '__main__':
updatable = Updatable()
p = multiprocessing.Process(target=consumer, args=(updatable,))
p.start()
producer()