I've been struggling with the same problem for some time now, and I'm finding it increasingly difficult to make any meaningful progress. By writing this question I hope find a potential solution, or collect some new ideas to continue trying myself.
At a high level this is what I'd like this achieve:
I'd like to simulate a small scale IoT deployment, comprised of roughly 50 devices. I have access to sensor readings using a websocket based streaming API, arriving at a frequency of approximately one reading (per sensor) per minute.
In addition to handling this data, each sensor should communicate (full-duplex) with a centralised coordinator node, whose job it is to aggregate data and distribute results.
Hopefully this description is good enough to give some indication of what I am trying to achieve.
For ease of testing, I would like to model each component as an object with a simple interface encapsulating the operations: Send / Receive / Broadcast.
class Sensor(object):
Receive : Receive sensor readings from the streaming API
Send : Send data to the coordinator
Receive : Receive data from the coordinator
class Coordinator(object)
Send : Send data to a specific sensor
Broadcast : Send data to all connected sensors
Receive : Receive data from a connected sensor
So far most of my efforts have used the asyncio and websockets libraries.
The latest approach
I recently spent some more time reading about the asyncio framework, and attempted the following. The idea here is that by injecting a shared eventloop (scheduler) into each object I can achieve the abstraction I'm after.
Sensor
import asyncio
from Client import Client
from Readings import Reading
class Sensor(object):
def __init__(self):
self.scheduler = asyncio.get_event_loop()
self.urban_api = None # Websocket streaming API
self.readings = Reading(self.scheduler, self.urban_api)
self.coordinator = Client(self.scheduler, host='localhost', port=8080)
self.scheduler.create_task(self.repetitive_message())
try:
self.scheduler.run_forever()
except KeyboardInterrupt:
pass
async def called_on_new_reading():
# Called on receipt of new reading.
@staticmethod
async def repetitive_message():
while True:
print('Sensor performing work')
await asyncio.sleep(2)
if __name__ == '__main__':
sensor = Sensor()
Reading - Handle streaming API
from websockets import connect
class Reading(object):
def __init__(self, scheduler, urban_api):
self.scheduler = scheduler
self.urban_api = urban_api
self.scheduler.create_task(self.receive())
async def receive(self):
async with connect(self.urban_api) as websocket:
async for message in websocket:
print(message)
I've not made any progress with implementing the coordinator since switching to websockets, but I hope that the intention is clear.
I have a few questions:
- Is this the right strategy to achieve the simulation I described?
- Are there any libraries that can simplify this further?
- How can I receive readings from the websocket in my sensor class?
I have not been able to find any OOP asyncio examples - perhaps am I missing something?
I appreciate that this post is quite lengthy, and lacking in parts, but any feedback or pointers would be greatly appreciated.