2

I've been struggling with the same problem for some time now, and I'm finding it increasingly difficult to make any meaningful progress. By writing this question I hope find a potential solution, or collect some new ideas to continue trying myself.

At a high level this is what I'd like this achieve:

I'd like to simulate a small scale IoT deployment, comprised of roughly 50 devices. I have access to sensor readings using a websocket based streaming API, arriving at a frequency of approximately one reading (per sensor) per minute.

In addition to handling this data, each sensor should communicate (full-duplex) with a centralised coordinator node, whose job it is to aggregate data and distribute results.

Hopefully this description is good enough to give some indication of what I am trying to achieve.

For ease of testing, I would like to model each component as an object with a simple interface encapsulating the operations: Send / Receive / Broadcast.

class Sensor(object):
   Receive : Receive sensor readings from the streaming API

   Send : Send data to the coordinator
   Receive : Receive data from the coordinator


class Coordinator(object)
   Send : Send data to a specific sensor
   Broadcast : Send data to all connected sensors 
   Receive : Receive data from a connected sensor

So far most of my efforts have used the asyncio and websockets libraries.

The latest approach

I recently spent some more time reading about the asyncio framework, and attempted the following. The idea here is that by injecting a shared eventloop (scheduler) into each object I can achieve the abstraction I'm after.

Sensor

import asyncio

from Client import Client
from Readings import Reading


class Sensor(object):

    def __init__(self):
        self.scheduler = asyncio.get_event_loop()
        self.urban_api = None  # Websocket streaming API 

        self.readings = Reading(self.scheduler, self.urban_api)
        self.coordinator = Client(self.scheduler, host='localhost', port=8080)

        self.scheduler.create_task(self.repetitive_message())

        try:
            self.scheduler.run_forever()
        except KeyboardInterrupt:
            pass


    async def called_on_new_reading():
        # Called on receipt of new reading.

    @staticmethod
    async def repetitive_message():
        while True:
            print('Sensor performing work')
            await asyncio.sleep(2)


if __name__ == '__main__':
    sensor = Sensor()

Reading - Handle streaming API

from websockets import connect


class Reading(object):

    def __init__(self, scheduler, urban_api):
        self.scheduler = scheduler
        self.urban_api = urban_api

        self.scheduler.create_task(self.receive())

    async def receive(self):
        async with connect(self.urban_api) as websocket:
            async for message in websocket:
                print(message)

I've not made any progress with implementing the coordinator since switching to websockets, but I hope that the intention is clear.

I have a few questions:

  1. Is this the right strategy to achieve the simulation I described?
  2. Are there any libraries that can simplify this further?
  3. How can I receive readings from the websocket in my sensor class?

I have not been able to find any OOP asyncio examples - perhaps am I missing something?

I appreciate that this post is quite lengthy, and lacking in parts, but any feedback or pointers would be greatly appreciated.

0 Answers0