I want to wrap a blocking function in Python 3.7+ so that it runs async while handling whatever output it produces on stdout asynchronously by a handler (in my case sending them over the network). This assumes that the blocking function in effect a black box that cannot be changed.
I believe the parts of what I want to do has been presented well already, but combining them has turned out to be a challenge for me:
I want to wrap a blocking function in asyncio (explained here).
I want whatever is output as stdout by that blocking function to be captured (explained here)
I want the captured stdout lines to be presented as an async Python generator (explained here).
My current code looks like this:
def run_in_executor(f):
"""
Decorator to run things in an async executor
"""
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, lambda: f(*args, **kwargs))
return inner
@contextmanager
def redirect_stdout(stream):
"""
Context to capture stdout in given stream
"""
old_stdout = sys.stdout
sys.stdout = stream
try:
yield
finally:
sys.stdout = old_stdout
def dummy_blocking_task():
sec=10
ct=sec
print(f"Doing blocking task for {sec} seconds")
while True:
time.sleep(0.5)
print(f"dummy stdout {ct}")
time.sleep(0.5)
ct -= 1
if ct<=0:
break
print(f"Blocking task finished after {sec} seconds")
# Wrap in async
@run_in_executor
def async_build():
# Create a in memory text based stream
stream = io.StringIO()
# POI: How can I asyncrounously handle the captured stdio lines as they are put into stream?
def some_magic_handler(line):
send_somewhere(f"HANDLED STDIO LINE: {line}")
some_magic_way(stream, some_magic_handler)
# Redirect stdout to the stream
with redirect_stdout(stream):
# Run the blocking function
res = dummy_blocking_task()
return ret
So how would I repace my pseudo functions some_magic_handler
and some_magic_way
for this to work?