10

I am currently running some endless tasks using asyncio.wait

I need a special function to run when all the others are on await

import asyncio 

async def special_function():
    while True:
        # does some work, 
        # Passes control back to controller to run main_tasks
        # if they are no longer waiting.
        await asyncio.sleep(0)

async def handler():
    tasks = [task() for task in main_tasks]

    # Adding the task that I want to run when all main_tasks are awaiting:
    tasks.append(special_function())

    await asyncio.wait(tasks)

asyncio.get_event_loop().run_until_complete(handler())

How can I get the special_function to only be run when all main_tasks are on await?


Edit:

What I mean by "all main_tasks are on await": all main_tasks are not ready to continue, e.g. are in asyncio.sleep(100) or I/O bound and still waiting for data.

Therefore the main_tasks cannot continue and the event loop runs the special_function while the tasks are in this state, NOT every iteration of the event loop.


Edit 2:

My use case:

The main_tasks are updating a data structure with new data from web-sockets.

The special_function transfers that data to another process upon an update signal from that process. (multiprocessing with shared variables and data structures)

It needs to be the most up to date data it can be when it transfers, there cannot be pending updates from main_tasks.

This is why I only want to run special_function when there are no main_tasks with new data available to be processed. (i.e. all waiting on await)

Community
  • 1
  • 1
Zak Stucke
  • 432
  • 6
  • 18
  • `special_function` is syntactically incorrect and it's also missing an `await` in front of `asyncio.sleep(0)`. Once you fix that, it will run the "does some work" part at every iteration of the event loop. Given that asyncio is single-threaded, you can rest assured that, when it runs, **all** other tasks are awaiting something. You don't need to do anything special to arrange that to happen, it's how asyncio works - if a task isn't awaiting something, it means it's running (or has completed). – user4815162342 May 26 '19 at 09:15
  • @user4815162342 thanks for spotting the error! Fixed. I've also added to my description above to explain the goal better, I do not want to run the function every iteration of the event loop, only when all main_tasks are awaiting and cannot continue at the currrent moment in time. Thanks! – Zak Stucke May 26 '19 at 10:56
  • My two cents: it seems to be a concurrency problem that should be solved with threads instead of asyncio. – Damián Montenegro May 31 '19 at 21:13
  • Let me explain better my point, you can use priority lock (https://stackoverflow.com/questions/39254040/is-it-possible-to-prioritise-a-lock) in such a way that main_tasks have priority, so special_function will run *only* when everyone else is not waiting for the lock. – Damián Montenegro May 31 '19 at 21:24

5 Answers5

7

I tried to write a test for the 'task not ready to run' condition. I think asyncio does not expose details from the scheduler. The developers have clearly stated they want to keep freedom for changing asyncio internals without breaking backward compatibility.

In asyncio.Task there is this comment (note: _step() runs the task coroutine till the next await):

# An important invariant maintained while a Task not done:
#   
# - Either _fut_waiter is None, and _step() is scheduled;
# - or _fut_waiter is some Future, and _step() is *not* scheduled.

But that internal variable is not in the API, of course.

You can get some limited access to _fut_waiter by reading the output of repr(task), but the format seems to be not reliable either, so I would not depend on somehing like this:

PENDINGMSG = 'wait_for=<Future pending '

if all(PENDINGMSG in repr(t) for t in monitored_tasks):
    do_something()

Anyway, I think you are trying to be too perfect. You want to know if there is new data in other tasks. What if the data is in asyncio buffers? Kernel buffer? Network card receive buffer? ... You could never know if new data arrives the next millisecond.

My suggestion: write all updates to a single queue. Check that queue as the only source of updates. If the queue is empty, publish the last state.

VPfB
  • 14,927
  • 6
  • 41
  • 75
  • After having a quick look this does seem to work, but as you said yourself, relying on repr is not ideal. I will use this to keep looking for a more concrete solution! Thank you! I also see your point about the data possibly being in other buffer locations as well and my goal maybe is to be too perfect, you may be right! – Zak Stucke May 31 '19 at 18:49
  • The last paragraph of this answer probably comes closest to the way such problems are **intended** to be solved with asyncio. – user4815162342 May 31 '19 at 20:10
2

This is what I'd do:

  1. I'd not use your special function.

  2. Each data update needs a separate generation ID (4 byte integer), and I'd only put in the ID in shared memory.

Both processes are running independently, I assume.

  1. The subscriber keeps the generation ID as local. When it notices the generation ID is changed in shared memory, then the read new data from the file.

  2. Data is stored on tmpfs (/tmp) so it's on memory. You can create your own tmpfs if suited. It's fast.

Here is why:

  • To make sure the subscriber doesn't fetch half-baked data in shared memory, it has to be protected by semaphore. It's a PITA
  • By using file, you can carry variable size data. This may not apply to you. One of hard things to solve when using shared memory is to have enough space but not waste space. Using file solves this problem.
  • By using 4-byte int generation ID, updating ID is atomic. This is a huge advantage.

So, as one of your tasks receives new data, open a file, write to it, and after closing the file descriptor, you write out the generation ID to shared memory. Before updating generation ID, you can delete the file safely. The subscriber - if it has opened file, it will complete reading the file, and if it tries to open it, it fails to open so it has to wait for the next generation anyway. If machine crashes, /tmp is gone so you don't need to worry about cleaning up files. You can even write a new task which solo job is to delete files in /tmp that is older generations if you like.

Naoyuki Tai
  • 403
  • 4
  • 7
  • Although not solving my problem directly, this is another really interesting approach which I will definitely look into! I do have the problem of static sized shared data so this would solve that. Thank you! – Zak Stucke Jun 03 '19 at 23:12
  • I have a lot of experience using share memory in my job, and carrying payload in share memory is something to avoid. Once you use semaphore to gurantee the correctness of content, managing semaphore as IPC is not easy. Sometimes, it is left locked and you end up rebooting machine to clean it up. Only way to make sure this to work is to have multiple copies of payload so the new one is not accessed while it's being written. It increases complexity and shared memory size at least 2x if not more. – Naoyuki Tai Jun 06 '19 at 17:38
1

When event loop runs some task, this task is being executed until it returns control back to event loop. There's usually only one reason task want to return control to the event loop: task if facing blocking operation (and thus is "not ready to continue").

It means that "every iteration of the event loop" is usually equal to "all main_tasks are on await". Code you already have will (mostly) work as you want. Only thing you should do is to make special_function() task.


There's some chance task returned control to event loop before it faced "real" blocking call and it usually looks like await asyncio.sleep(0) (like you do in special_function). It means task want to ensure all other tasks being called before continue: you probably want to respect that.

Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • Hi Mikhail, thanks for your answer! Sadly this does not solve it. If a task is blocked and passes back to controller, but is ready to continue again before the event loop reaches the special function, then I need it NOT to run the special function, and instead continue with the `main_tasks`. I have updated my question with my use case for this functionality. – Zak Stucke May 26 '19 at 19:25
0

Why not use a semaphore

async def do_stuff(semaphore):
    async with semaphore:
      await getting_stuff_done()

async def wait_til_everyone_is_busy(semaphore):
    while not semaphore.locked():
      await asyncio.sleep(1)
    do_other_stuff()

To better illustrate my point, take this trivial example:

import asyncio
import time

async def process(semaphore, i):
    while True:
        print(f"{i} I'm gonna await")
        await asyncio.sleep(1)
        async with semaphore:
            print(f'{i} sleeping')
            await asyncio.sleep(3)
        print(f'{i} done sleeping')
        print(f"{i} I'm gonna await again")
        await asyncio.sleep(1)

async def other_process(semaphore):
    while True:
        while not semaphore.locked():
            print("Everyone is awaiting... but I'm not startingr")
            await asyncio.sleep(1)
        print("Everyone is busy, let's do this!")
        time.sleep(5)
        print('5 seconds are up, let everyone else play again')
        await asyncio.sleep(1)

semaphore = asyncio.Semaphore(10)
dataset = [i for i in range(10)]
loop = asyncio.new_event_loop()
tasks = [loop.create_task(process(semaphore, i)) for i in dataset]
tasks.append(loop.create_task(other_process(semaphore)))
loop.run_until_complete(asyncio.wait(tasks))

We create 10 tasks that use the "process" function, and one that uses the "other_process". The one that performs "other_process" can only run while all the others are holding the semaphore, and because of the way Asyncio context-switching works, only the "other_process" function will be performed while the others are in an await status, up until "other_process" hits its own "await".

$ python3 tmp
0 I'm gonna await
1 I'm gonna await
2 I'm gonna await
3 I'm gonna await
4 I'm gonna await
5 I'm gonna await
6 I'm gonna await
7 I'm gonna await
8 I'm gonna await
9 I'm gonna await
Everyone is awaiting... but I'm not startingr
0 sleeping
1 sleeping
2 sleeping
3 sleeping
4 sleeping
5 sleeping
6 sleeping
7 sleeping
8 sleeping
9 sleeping
Everyone is busy, let's do this!
5 seconds are up, let everyone else play again
0 done sleeping
0 I'm gonna await again
1 done sleeping
1 I'm gonna await again
2 done sleeping
2 I'm gonna await again
3 done sleeping
3 I'm gonna await again
4 done sleeping
4 I'm gonna await again
5 done sleeping
5 I'm gonna await again
6 done sleeping
6 I'm gonna await again
7 done sleeping
7 I'm gonna await again
8 done sleeping
8 I'm gonna await again
9 done sleeping
9 I'm gonna await again
Everyone is awaiting... but I'm not startingr
0 I'm gonna await
1 I'm gonna await
2 I'm gonna await
3 I'm gonna await
4 I'm gonna await
5 I'm gonna await
6 I'm gonna await
7 I'm gonna await
8 I'm gonna await
9 I'm gonna await
Everyone is awaiting... but I'm not startingr
0 sleeping
1 sleeping
2 sleeping
3 sleeping
4 sleeping
5 sleeping
6 sleeping
7 sleeping
8 sleeping
9 sleeping
Everyone is busy, let's do this!
  • Hi Jason, thanks for the answer! After researching Semaphores a little it seems their main purpose is to prevent overusing resources, which is a little different to my problem. As my functions are working in the same event loop, I can trust that all the functions will run concurrently (not at the same time but one after another). My main question is to only run the `special_task` when all of the functions are on `await` BUT are not ready to continue, i.e. they still need to await for longer before they can continue (for example in a long `asyncio.sleep()` or I/O bound which is the case for me) – Zak Stucke May 29 '19 at 21:55
  • Sure that's their "main" purpose, but, it does offer a solution to your problem. Basically, the semaphore would only be "locked" when all your contexts are in an await status, therefore, the thread would only run when all your contexts are 'await'-ing. You would of course need to set up the semaphore to have exactly the number of locks as there are worker threads (except the one that is waiting for everyone else to be busy). – Jason Shaffner May 29 '19 at 22:09
  • I think you're missing a key part of my problem. I don't just need them to be on `await`, this is automatically the case due to the way the asyncio event loop works, I need all of the main_tasks to be UNABLE to continue, because the task being awaited is not yet finsihed, e.g. `asyncio.sleep(100)` has not been reached, or external data has not yet been received. A sephamore will lock straight away when the function starts to `await`, and not unlock until the function moves on from the await, rather than when the function is ready to continue (i.e. sleep period over / data received etc). – Zak Stucke May 29 '19 at 22:22
  • See my clarification... because of the way the Asyncio event loop works, unless you have an "await" somewhere in "other_process" or your "special_task", then it will continue to run only that "other_process" or your "special_task" until that context gives up control (with keyword "await"). The semaphore is only a way of synchronizing for when "other_process" will start, it doesn't "lock" the "process" contexts unless ALL of them are in an await status, and at that point each and every context will have its own hold on the semaphore. – Jason Shaffner May 29 '19 at 22:46
  • Yes, they are all in the await status, BUT, depending on where asyncio is at in the event loop, it may run your `other_process` even though one of your `process` workers may have reached the end of it's `asyncio.sleep()` and could be continued before the `other_process` is run, but it is not run because the `other_process` was closer in the iteration of the asyncio event loop, this is what I want to solve! In your answer you are solving a slightly different problem. Does this make sense? – Zak Stucke May 29 '19 at 22:54
  • Ha, I'm re-reading my answer and I see where you're coming from now. My first sentence in the answer is off, what I mean is: use the semaphore in cases where you know there are long awaits or where it makes sense, not necessarily on every context switch. I don't believe there's a way to see if an awaitable "could continue" unless you start running that particular context again... – Jason Shaffner May 29 '19 at 23:09
  • Yep I'm hoping there's a simple way, at the worst some hacking of the internal asyncio event loop may be needed! It has to be possible, as asyncio must use some sort of check every iteration of the workers to see if they can be continued or not. – Zak Stucke May 29 '19 at 23:18
0

Push both the input and output requests onto a PriorityQueue with the input prioritized over the output. Then just process tasks from the queue normally and it will always fulfill all outstanding input requests before any output ones.

So your main loop would consist of something like the following:

  • InputListener1 (queues each InputTask1 received at priority 0)
  • InputListener2 (queues each InputTask2 received at priority 0)
  • InputListener3 (queues each InputTask3 received at priority 0)
  • OutputListener (queues each OutputTask received at priority 1)
  • QueueWorker (processes the next task from the queue)

This likely means you'll have to split the logic for all of your existing tasks into separate socket listeners and the actual task processing, but that's not necessarily a bad thing.

Nick Hilt
  • 1
  • 1
  • This would not work for my particular case as it could still run the QueueWorker if it is closer in the asyncio event loop even if an InputListener was ready to add another task to the queue! – Zak Stucke May 31 '19 at 18:50
  • Then you have to put everything in a super-task with special_function last that breaks whenever one of the sub-tasks actually does something. Nothing can guarantee an update hasn't been requested prior to the output being issued though, which is really only different as a matter of timing. – Nick Hilt May 31 '19 at 19:06