76

I have successfully built a RESTful microservice with Python asyncio and aiohttp that listens to a POST event to collect realtime events from various feeders.

It then builds an in-memory structure to cache the last 24h of events in a nested defaultdict/deque structure.

Now I would like to periodically checkpoint that structure to disc, preferably using pickle.

Since the memory structure can be >100MB I would like to avoid holding up my incoming event processing for the time it takes to checkpoint the structure.

I'd rather create a snapshot copy (e.g. deepcopy) of the structure and then take my time to write it to disk and repeat on a preset time interval.

I have been searching for examples on how to combine threads (and is a thread even the best solution for this?) and asyncio for that purpose but could not find something that would help me.

Any pointers to get started are much appreciated!

fxstein
  • 1,133
  • 1
  • 9
  • 12
  • 1
    I have used dano's suggestions and built a very simple multi threaded setup that checkpoints the in-memory event store every 60 seconds to disk. Here is a link to the git repo file that contains the entire logic: https://github.com/fxstein/SentientHome/blob/master/engine/event.engine.py – fxstein Feb 13 '15 at 15:32

3 Answers3

93

It's pretty simple to delegate a method to a thread or sub-process using BaseEventLoop.run_in_executor:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_operation(x):
    time.sleep(x) # This is some operation that is CPU-bound

@asyncio.coroutine
def main():
    # Run cpu_bound_operation in the ProcessPoolExecutor
    # This will make your coroutine block, but won't block
    # the event loop; other coroutines can run in meantime.
    yield from loop.run_in_executor(p, cpu_bound_operation, 5)


loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())

As for whether to use a ProcessPoolExecutor or ThreadPoolExecutor, that's kind of hard to say; pickling a large object will definitely eat some CPU cycles, which initially would make you think ProcessPoolExecutor is the way to go. However, passing your 100MB object to a Process in the pool would require pickling the instance in your main process, sending the bytes to the child process via IPC, unpickling it in the child, and then pickling it again so you can write it to disk. Given that, my guess is the pickling/unpickling overhead will be large enough that you're better off using a ThreadPoolExecutor, even though you're going to take a performance hit because of the GIL.

That said, it's very simple to test both ways and find out for sure, so you might as well do that.

Casey Jones
  • 155
  • 9
dano
  • 91,354
  • 19
  • 222
  • 219
  • 1
    Thank you dano! This was a lot easier after all. You are correct I took the path of using ThreadPoolExecutor and it works fine. Writing checkpoints ever 60 sec now without holding up any of the event processing. – fxstein Feb 13 '15 at 15:05
  • too little too late on my part I suspect - but when I hear "asyncio + paralllelism [sp?]" I am reminded of this exceptional article shorturl.at/aOVZ2. Oh yes, I am also conveniently reminded or thesaying *"thou shalt not judge others' comments, or bare witness to your own judgement"* ̿̿ ̿̿ ̿̿ ̿’̿’\̵͇̿̿\З= ( ▀ ͜͞ʖ▀) =Ε/̵͇̿̿/’̿’̿ ̿ ̿̿ ̿̿ ̿̿ (so if none of this is relevant - tough). ta – JB-007 Aug 21 '22 at 10:44
15

I also used run_in_executor, but I found this function kinda gross under most circumstances, since it requires partial() for keyword args and I'm never calling it with anything other than a single executor and the default event loop. So I made a convenience wrapper around it with sensible defaults and automatic keyword argument handling.

from time import sleep
import asyncio as aio
loop = aio.get_event_loop()

class Executor:
    """In most cases, you can just use the 'execute' instance as a
    function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in
    the executor, assign result to y. The defaults can be changed, though,
    with your own instantiation of Executor, i.e. execute =
    Executor(nthreads=4)"""
    def __init__(self, loop=loop, nthreads=1):
        from concurrent.futures import ThreadPoolExecutor
        self._ex = ThreadPoolExecutor(nthreads)
        self._loop = loop
    def __call__(self, f, *args, **kw):
        from functools import partial
        return self._loop.run_in_executor(self._ex, partial(f, *args, **kw))
execute = Executor()

...

def cpu_bound_operation(t, alpha=30):
    sleep(t)
    return 20*alpha

async def main():
    y = await execute(cpu_bound_operation, 5, alpha=-2)

loop.run_until_complete(main())
enigmaticPhysicist
  • 1,518
  • 16
  • 21
6

Another alternative is to use loop.call_soon_threadsafe along with an asyncio.Queue as the intermediate channel of communication.

The current documentation for Python 3 also has a section on Developing with asyncio - Concurrency and Multithreading:

import asyncio

# This method represents your blocking code
def blocking(loop, queue):
    import time
    while True:
        loop.call_soon_threadsafe(queue.put_nowait, 'Blocking A')
        time.sleep(2)
        loop.call_soon_threadsafe(queue.put_nowait, 'Blocking B')
        time.sleep(2)

# This method represents your async code
async def nonblocking(queue):
    await asyncio.sleep(1)
    while True:
        queue.put_nowait('Non-blocking A')
        await asyncio.sleep(2)
        queue.put_nowait('Non-blocking B')
        await asyncio.sleep(2)

# The main sets up the queue as the communication channel and synchronizes them
async def main():
    queue = asyncio.Queue()
    loop = asyncio.get_running_loop()

    blocking_fut = loop.run_in_executor(None, blocking, loop, queue)
    nonblocking_task = loop.create_task(nonblocking(queue))

    running = True  # use whatever exit condition
    while running:
        # Get messages from both blocking and non-blocking in parallel
        message = await queue.get()
        # You could send any messages, and do anything you want with them
        print(message)

asyncio.run(main())

How to send asyncio tasks to loop running in other thread may also help you.

If you need a more "powerful" example, check out my Wrapper to launch async tasks from threaded code. It will handle the thread safety part for you (for the most part) and let you do things like this:

# See https://gist.github.com/Lonami/3f79ed774d2e0100ded5b171a47f2caf for the full example

async def async_main(queue):
    # your async code can go here
    while True:
        command = await queue.get()
        if command.id == 'print':
            print('Hello from async!')
        elif command.id == 'double':
            await queue.put(command.data * 2)

with LaunchAsync(async_main) as queue:
    # your threaded code can go here
    queue.put(Command('print'))
    queue.put(Command('double', 7))
    response = queue.get(timeout=1)
    print('The result of doubling 7 is', response)
Lonami
  • 5,945
  • 2
  • 20
  • 38
  • The only way you interact with `queue` from another thread is through [`loop.call_soon_threadsafe`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe), which "Must be used to schedule callbacks *from another thread*." As I understand it, this is the safe way to do it, but I may be misunderstanding the documentation. – Lonami Jul 27 '21 at 20:10
  • Could someone please explain why if we comment out all three `await asyncio.sleep()` function executions in `async def nonblocking(queue)`, the code stops working as expected? – ievgenii Jun 16 '22 at 17:12
  • If there are no `await`, the function effectively becomes blocking. `await` is needed so that the `asyncio` event loop can resume control (the function is paused in the `await` points, letting it do other work, before getting back to it; it literally "returns" at that point but gets back to it later). – Lonami Jun 18 '22 at 19:58
  • Thank you, I am aware of that. However, I have two follow up questions: 1. Isn't `queue.put_nowait('Non-blocking A')` supposed to be non-blocking, since it's a "nowait" version of `put`? 2. I tried doing `await queue.put_nowait('Non-blocking A')` but that didn't make a difference. – ievgenii Jun 18 '22 at 21:08
  • 1
    `put` is a coroutine (needs to be `await`-ed) which blocks (yields control) until there's free space in the queue to put the item. `put_nowait` is not a coroutine (does not use `await`) and will instead raise an error if the queue is not full. If `nonblocking` never sleeps or yields, it will not give a chance for other code to run. You can use `await asyncio.sleep(0)` to force a yield without sleeping. – Lonami Oct 20 '22 at 10:04