Let's say you have a hardware device that sends you updates at more or less random intervals and that a client wants to receive events every time this happens. How would you write a PyTango (The Python wrapper for the Tango controls library PyTango.server.Device
class that simulates this using a thread that pushes new attribute values?
Asked
Active
Viewed 652 times
2

NeilenMarais
- 2,949
- 1
- 25
- 23
1 Answers
2
The answer seems to be
- Calling the
PyTango.server.Device
methodset_change_event()
to tell Tango that the device is taking care of its own change event pushes, without needing to use a polling loop. - Using
Pytango.sever.Device
methodpush_change_event()
from the update thread. Seems thread-safe so far, have not seen any malarkey even with very high update rates. Would be nice if someone could confirm that. - Also caching the updates so that clients polling the attribute get the correct value and timestamp. This seems a bit redundant, is there a better way?
- Do not set up polling on the attribute since it can cause client polling to get stale values (Tango seems to cache attribute values from polls in a way that I can't quite figure out).
Herewith an example (python 2.7) server using the externally updated randomnumber
attribute
import time
import threading
import random
from PyTango.server import server_run
from PyTango.server import Device, DeviceMeta
from PyTango.server import attribute, command
from PyTango import AttrQuality
class ExternallyUpdated(Device):
__metaclass__ = DeviceMeta
def __init__(self, *args, **kwargs):
super(ExternallyUpdated, self).__init__(*args, **kwargs)
self._randomnumber = (0, 0, AttrQuality.ATTR_VALID)
# Tell Tango that we don't need a polling loop, we'll
# push change events explicitly
self.set_change_event('randomnumber', True)
@attribute(label="Random Number", dtype=int,
# Enables update events for absolute changes >= 1
abs_change=1)
def randomnumber(self):
return self._randomnumber
def init_device(self):
super(ExternallyUpdated, self).init_device()
self.t = threading.Thread(target=self.update_loop)
self.t.setDaemon(True)
self.t.start()
def update_loop(self):
while True:
try:
new_number = random.randint(0, 10000)
ts = time.time()
sleeptime = random.random()*10
print ('Timestamp: {:.5f} New value: {} sleeptime: {}'
.format(ts, new_number, sleeptime))
# Need to cache the value so that clients can poll the attribute
self._randomnumber = (new_number, ts, AttrQuality.ATTR_VALID)
self.push_change_event(
'randomnumber', new_number, ts, AttrQuality.ATTR_VALID)
time.sleep(sleeptime)
except Exception:
logger.exception('Exception in update loop')
time.sleep(1)
if __name__ == "__main__":
server_run([ExternallyUpdated])
And an example client below, assuming you exported the device as so_example/external/1
. It should print a message every time randomnumber is updated.
import time
import logging
import PyTango
logger = logging.getLogger()
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(module)s - '
'%(pathname)s : %(lineno)d - %(message)s',
level=logging.INFO)
device_name = 'so_example/external/1'
td = PyTango.DeviceProxy(device_name)
attr_name = 'randomnumber'
# Set up a listener
def printer(event_data):
try:
print event_data # A PyTango.EventData instance
except Exception:
# Not handling exceptions seems to break the event update loop. Or I was
# doing something else stupid, must still investigate :)
logger.exception('Exception while handling event, event_data: {}'
.format(event_data))
poll_event_id = td.subscribe_event(
attr_name, PyTango.EventType.CHANGE_EVENT, printer)
# Do something that blocks here, or just run in an interactive session
# time.sleep(60)
# This is how you would unsubscribe from the events.
#td.unsubscribe_event(poll_event_id)

NeilenMarais
- 2,949
- 1
- 25
- 23
-
A useful description of the high level Python server API is here: http://www.esrf.eu/computing/cs/tango/tango_doc/kernel_doc/pytango/latest/tep/tep-0001.html – NeilenMarais Feb 17 '16 at 16:17