1

I am new to asyncio ( used with python3.4 ) and I am not sure I use it as one should. I have seen in this thread that it can be use to execute a function every n seconds (in my case ms) without having to dive into threading.

I use it to get data from laser sensors through a basic serial protocol every n ms until I get m samples.

Here is the definition of my functions :

def countDown( self, 
               loop, 
               funcToDo, 
               *args, 
               counter = [ 1 ],
               **kwargs ):
    """ At every call, it executes funcToDo ( pass it args and kwargs )
        and count down from counter to 0. Then, it stop loop """
    if counter[ 0 ] == 0:
        loop.stop() 
    else:
        funcToDo( *args, **kwargs )
        counter[ 0 ] -= 1


def _frangeGen( self, start = 0, stop = None, step = 1 ):
    """ use to generate a time frange from start to stop by step step """
    while stop is None or start < stop:
        yield start
        start += step

def callEvery( self, 
               loop, 
               interval, 
               funcToCall, 
               *args, 
               now = True, 
               **kwargs ):
    """ repeat funcToCall every interval sec in loop object """
    nb = kwargs.get( 'counter', [ 1000 ] )
    def repeat( now = True,
                times = self._frangeGen( start = loop.time(),
                                         stop=loop.time()+nb[0]*interval,
                                         step = interval ) ):
        if now:
            funcToCall( *args, **kwargs )
        loop.call_at( next( times ), repeat )

    repeat( now = now )

And this is how I use it (getAllData is the function that manage serial communication) :

ts = 0.01
nbOfSamples = 1000
loop = asyncio.get_event_loop()
callEvery( loop, ts, countDown, loop, getAllData, counter = [nbOfSamples] )  
loop.run_forever()

I want to put that bloc into a function and call it as often as I want, something like this :

for i in range( nbOfMeasures ):
    myFunction()
    processData() 

But the second test does not call getAllData 1000 times, only twice, sometimes thrice. The interesting fact is one time in two I get as much data as I want. I don't really understand, and I can't find anything in the docs, so I am asking for your help. Any explanation or an easier way to do it is gladly welcome :)

Community
  • 1
  • 1
Nessy W.
  • 151
  • 2
  • 10
  • I haven't studied your code in depth, however, would it be possibly in your case to simply call funcToCall by running it inside a asyncio.async() task? You can use an asyncio.sleep() function inside a while True loop inside your funcToCall in order to let it 'sleep' for a set duration. Simply create as many of these tasks as you need and they will effectively run concurrently on the same loop. – songololo Sep 29 '15 at 20:23

2 Answers2

1

You are complicating things too much and, generally speaking, doing recursion when you have an event loop is bad design. asyncio is fun only when you make use of coroutines. Here's one way of doing it:

import asyncio as aio

def get_laser_data():
  """
  get data from the laser using blocking IO
  """
  ...


@aio.coroutine  
def get_samples(loop, m, n):
  """
  loop = asyncio event loop
  m = number of samples
  n = time between samples
  """
  samples = []
  while len(samples) < m:
    sample = yield from loop.run_in_executor(None, get_laser_data)
    samples.append(sample)
    yield from aio.sleep(n)

  return samples

@aio.coroutine
def main(loop):
  for i in range(nbOfMeasures):
    samples = yield from get_samples(loop, 1000, 0.01)
    ...

loop = aio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

If you are completely confused by this, consider reading some tutorials/documentation about asyncio.

But I would like to point out that you must use a thread to get the data from the laser sensor. Doing any blocking IO in the same thread that the event loop is running will block the loop and throw off aio.sleep. This is what yield from loop.run_in_executor(None, get_laser_data) is doing. It's running the get_laser_data function in a separate thread.

Jashandeep Sohi
  • 4,903
  • 2
  • 23
  • 25
  • Thanks for the answer. I am slowly understanding how things work, but coroutines still resist me. Anyway, your solution does not fit my need since it sleep **after** the execution of the `get_laser_data` function, thus it will be called every n sec + the time to execute `get_laser_data`. – Nessy W. Sep 30 '15 at 12:54
  • You could call `loop.time()` before and after a call to `get_laser_data`, and then remove the difference when sleeping: `aio.sleep(n-(after-before))`. But, you will always have some error because of the resolution of the `asycnio` clock. Internally, it's using the `monotonic` clock from the `time` module. If you need more precise timing, I would recommend using the `perf_counter` from `time` & instead of using `asyncio` write a simple loop to do the sleeping. – Jashandeep Sohi Sep 30 '15 at 17:37
  • I have try a simple while loop with a sleep function (that was my first try actually). I am not precise enough, even with `time.perf_counter()`, but I think it is due to the precision of the `time.sleep()` function. I have come to an ugly solution using loop, so for now I am trying the threading.Timer class to find a cleaner way. – Nessy W. Oct 01 '15 at 09:08
  • 1
    On my system `perf_counter` has a resolution of `1e-09`. If you need to be more precise than that, then Python is not the way to go. Plus, I don't see how using a Thread will be any better. If anything, you will loose precision. Internally, Python only runs one thread at a time, switching them every x instructions. To get really precise you need as little abstraction as possible between you & the hardware clocks. – Jashandeep Sohi Oct 01 '15 at 09:52
  • `perf_counter` is precise enough, but not `time.sleep`. My function execution time is between 7 and 14 ms. If I want to do it every 20ms, than I need to time.sleep 6ms in the worst case. When I try to do it 100 times, my code is run in a averaging 2,04 seconds. And you are right, using a thread is not better, just _cleaner_. (I did not manage to make it work actually) – Nessy W. Oct 01 '15 at 10:06
  • Have you tried a busy sleep? Basically, a while loop inside the while loop, looping until some time is reached. Also try messing with the `sys.setswitchinterval` setting. – Jashandeep Sohi Oct 01 '15 at 10:45
0

In python 3.5, you can make use of the async for syntax and create an asynchronous iterator to control your time frames. It has to implement the __aiter__ and __anext__ methods:

class timeframes(collections.AsyncIterator):

    def __init__(self, steps, delay=1.0, *, loop=None):
        self.loop = asyncio.get_event_loop() if loop is None else loop
        self.ref = self.loop.time()
        self.delay = delay
        self.steps = steps
        self.iter = iter(range(steps))

    async def __anext__(self):
        try:
            when = self.ref + next(self.iter) * self.delay
        except StopIteration:
            raise StopAsyncIteration
        else:
            future = asyncio.Future()
            self.loop.call_at(when, future.set_result, None)
            await future
            return self.loop.time()

    async def __aiter__(self):
        return self

Here's a coroutine that simulates an execution:

async def simulate(steps, delay, execution):
    # Prepare timing
    start = loop.time()
    expected = steps * delay - delay + execution
    # Run simulation
    async for t in timeframes(steps, delay):
        await loop.run_in_executor(None, time.sleep, execution)
    # Return error
    result = loop.time() - start
    return result - expected

And this is the kind of result you'll get on a linux OS:

>>> loop = asyncio.get_event_loop()
>>> simulation = simulate(steps=1000, delay=0.020, execution=0.014)
>>> error = loop.run_until_complete(simulation)
>>> print("Overall error = {:.3f} ms".format(error * 1000))
Overall error = 1.199 ms

It is different on a windows OS (see this answer) but the event loop will catch up and the overall error should never exceed 15 ms.

Community
  • 1
  • 1
Vincent
  • 12,919
  • 1
  • 42
  • 64