1

my goal is to get some practice with using asyncio library. I have read some introductory tutorials and now I'd like to write some code by myself.

I'd like to start two simple tasks which basically increment common value stored in outside class. First one is kinda automatic - increment by one after 5 seconds. Second task is user-related: if you enter some value within those 5 seconds, it should be added too.

The problem is, when I don't enter any value, my loop doesn't close - the program is still active and runs forever until I force stop it - then I'm getting following error:

2.py
[Auto_increment: ] This task will increment value after 5 seconds
[Manual increment: ] Waiting 5s for inc value:
Timeout
Loop finished. Value is 1
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/usr/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Process finished with exit code 0

Basically after "Loop finished" there is end of program, but when no value was put into console input, the program just hangs. When I enter any v

2.py
[Auto_increment: ] This task will increment value after 5 seconds
[Manual increment: ] Waiting 5s for inc value:
5
Loop finished. Value is 6

Process finished with exit code 0

It looks like when TimeoutError happens, there's something not cleaned after asyncio.wait_for. Can you help me and tell, what's wrong? This is my code:

import asyncio
import sys


class ValContainer:
    _val = 0

    @staticmethod
    def inc_val(how_many=1):
        ValContainer._val += how_many

    @staticmethod
    def get_val() -> int:
        return ValContainer._val


async def auto_increment():
    print(f'[Auto_increment: ] This task will increment value after 5 seconds')
    await asyncio.sleep(5)
    ValContainer.inc_val()
    return True


async def manual_increment(loop):
    print(f'[Manual increment: ] Waiting 5s for inc value:')
    try:
        future = loop.run_in_executor(None, sys.stdin.readline)
        line = await asyncio.wait_for(future, 5, loop=loop)
        if line:
            try:
                how_many = int(line)
                ValContainer.inc_val(how_many)
            except ValueError:
                print('That\'s not a number!')

    except asyncio.TimeoutError:
        print('Timeout')
    finally:
        return True

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    task_auto = loop.create_task(auto_increment())
    task_man = loop.create_task(manual_increment(loop))
    loop.run_until_complete(task_auto)
    loop.run_until_complete(task_man)
    print(f'Loop finished. Value is {ValContainer.get_val()}')
    loop.close()
Benyamin Jafari
  • 27,880
  • 26
  • 135
  • 150
Asmoox
  • 592
  • 8
  • 23
  • Not sure why you used static methods when you are incrementing a class value. If you wanted the class to be a namespaced global, then at least make it easier on yourself and just use `@classmethod`. – Martijn Pieters Jun 15 '19 at 15:10
  • @RomanPerekhrest: this is OS dependent. And did your process actually exit, or did you have to hit the enter key before it did? – Martijn Pieters Jun 15 '19 at 15:37
  • @MartijnPieters, that was the ambiguity in question - *" there's something not cleaned after asyncio.wait_for"* – RomanPerekhrest Jun 15 '19 at 15:40
  • @Asmox, do you need to just exit the entire programm OR ensure that the event loop was closed? – RomanPerekhrest Jun 15 '19 at 15:42
  • @RomanPerekhrest: when using asyncio, it's normal to expect tasks to be cancelled after `asyncio.wait_for()`, the fact that a `ThreadPoolExecutor` *task* can be cancelled but not the thread it started is perfectly understandable if you are not used to blocking I/O on threads. – Martijn Pieters Jun 15 '19 at 15:43
  • 1
    @RomanPerekhrest: The event loop *is closed*, they are asking why their process is not exiting. If the loop hadn't closed you'd not have the `Loop finished. Value is 1` output. – Martijn Pieters Jun 15 '19 at 15:44
  • Yes, the problem is after "Loop finished", the program doesn't actually finish. – Asmoox Jun 16 '19 at 08:23

1 Answers1

1

You have started a separate thread in a threadpoolexecutor, and those can't actually be cancelled. The asyncio 'delegate', the task, is cancelled, but the sys.stdin.readline call will sit there indefinitely. You could end it by hitting enter, as that gives you a complete line on sys.stdin.

You'd have to use one of the work-arounds to cancel the read here; note that you can't tell ThreadPoolExecutor to use a daemon thread.

In the case of waiting for user input as a separate task in an asyncio context, it is probably easier to just create your own thread rather than ask a THreadPoolExecutor to manage threads for you, so you can set daemon=True on that thread and just have the process kill that thread when exiting.

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
  • Another workarounds could be considered to cancel the whole "run_in_executor using ThreadPoolExecutor". (https://gist.github.com/yeraydiazdiaz/b8c059c6dcfaf3255c65806de39175a7) not just `sys.stdin` – RomanPerekhrest Jun 15 '19 at 15:55
  • @RomanPerekhrest: No, that won't work either, as stated in that gist: *shutdown is design to stop new work coming in, but not to stop already existing work*. That gist uses `sleep()`s call that eventually exit on their own. – Martijn Pieters Jun 15 '19 at 16:26
  • @RomanPerekhrest: all you get there is that there is no `join()` call on the thread. The process will still wait on the thread until you issue an interrupt. – Martijn Pieters Jun 15 '19 at 16:28
  • I feel I don't like that fact that `run_in_executor(` is so unmanageable and leads to such workarounds. I would go with a separate thread/process which can be identified/killed, – RomanPerekhrest Jun 15 '19 at 19:31
  • @RomanPerekhrest this is not limited to `ThreadPoolExecutor`, this is a problem with threads, full stop. A separate process also has limitations, you then need to worry about separating streams for example. – Martijn Pieters Jun 15 '19 at 19:41
  • As for "separating streams": at least we need to make a stream reading/writing as non-blocking, involving, I guess, some non-waitable Queues along with a thread switched to "deamon" mode. But, that would lead us to just another workaround – RomanPerekhrest Jun 15 '19 at 19:53
  • Okay I see it now, when I press enter after timeout, program finish correctly. Can I do something like sending [enter] to readline when TimeoutError is processed? – Asmoox Jun 16 '19 at 09:56