4

Need to register global hotkeys. For example, f4 and f8. With keyboard library while first callback didn't return, the next won't call.

Another words, logs like this

pressed f4
end for f4
pressed f8
end for f8

But I want to like this

pressed f4
pressed f8
end for f4
end for f8

Demo code

# pip install keyboard
from keyboard import add_hotkey, wait
from time import sleep

def on_callback(key):
    print('pressed', key)
    sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: on_callback("f4"))
add_hotkey("f8", lambda: on_callback("f8"))

wait('esc')

I tried to use asyncio, but nothing changed

pressed f4
end for f4
pressed f8
end for f8
from keyboard import add_hotkey, wait
import asyncio

async def on_callback(key):
    print('pressed', key)
    await asyncio.sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: asyncio.run(on_callback("f4")))
add_hotkey("f8", lambda: asyncio.run(on_callback("f8")))

wait('esc')

Update 1

Keyboard library's developer gave advise to use call_later function that create new thread for each callback and it's works like I want.

But is there way to do such task in the same thread (use asyncio)? I didn't succeed.

# example with 'call_later' function
from keyboard import add_hotkey, wait, call_later
from time import sleep

def on_callback(key):
    print('pressed', key)
    sleep(5) # emulate long run task
    print('end for', key)

add_hotkey("f4", lambda: call_later(on_callback, args=("f4",)))
add_hotkey("f8", lambda: call_later(on_callback, args=("f8",)))

wait('esc')

Update 2

Now it's look like below (full code on github). I seems that creating new threads in order to wait http request is too heavy operation. Thus I want to use asyncio in current thread and the same time continue handle other hotkeys.

from googleapiclient.discovery import build
from os import getenv
from settings import get_settings
from loguru import logger
import keyboard

class ScriptService():

    def __init__(self):
        # ...
        self._script = AppsScript(id)
        self._hotkeys = values["hotkeys"]

    def _register_hotkeys(self):
        self._add_hotkey(self._hotkeys["reload"], self._on_reload)
        for item in self._hotkeys["goofy"]:
            k, f = item["keys"], item["function"]
            self._add_hotkey(k, self._on_callback, args=(f, k))

    def _add_hotkey(self, keys, callback, args=()):
        # lambda bug: https://github.com/boppreh/keyboard/issues/493
        keyboard.add_hotkey(keys, lambda: keyboard.call_later(callback, args))

    def _on_callback(self, function, keys):
        response = self._script.run(function)

class AppsScript():

    def __init__(self, id: str):
        self._name = getenv("API_SERVICE_NAME")
        self._version = getenv("API_VERSION")
        self._id = id

    def run(self, function: str):
        body = {"function": function}
        with build(self._name, self._version, credentials=get_credentials()) as service:
            # http request
            return service.scripts().run(scriptId=self._id, body=body).execute()

Viewed
  • 1,159
  • 3
  • 16
  • 43
  • Whilst it is possible to wrap your calls in a `run_in_exeuctor`, and thereby have asyncio handle the threading for you, it would give no advantage which I can see in this use case. Asyncio is for asyncio-compatible libraries, and keyboard simply isn't written that way. I could post an answer pretending it's asyncio compatible, but it would be a horrible hack. Why do you *have* to use asyncio anyway? – 2e0byo Oct 04 '21 at 10:18
  • @2e0byo I seems that creating new threads in order to wait http request is too heavy operation. Thus I want to use asyncio in current thread and the same time continue handle other hotkeys. – Viewed Oct 04 '21 at 11:15
  • Ah. This sounds like an x/y problem. *that* problem sounds solveable---if you edit the question and add something of what you want to do on keypress, rather than just sleep. There's certainly nothing daft about wanting to offload your http requests to an asyncio event loop (although that doesn't change the fact that keyboad is not asyncio compatible, so we're going to need to write our own interface) – 2e0byo Oct 04 '21 at 11:17
  • @2e0byo added new code – Viewed Oct 04 '21 at 11:27

1 Answers1

1

Unfortunately none of the libraries you are using are actually awaitable, so using them with asyncio is going to be a challenge. You could extract the actual http calls from the google library and then implement your own client later using an asyncio-compatible library, but that's a lot of work just to avoid the expense of spinning up a new thread.

Fortunately there's already a way to avoid the expensive of spinning up threads: use a pool of worker threads. In this approach, rather than spinning up a new thread immediately for every callback, we add the task to a queue of tasks serviced by a pool of threads we spin up in advance. That way we pay to spin the thread up only one, and after that we only pay to serialise the request to the thread---which is not nothing, but it's less than spinning up a thread.

Whilst it's possible to have asyncio manage the thread pool, in this instance it brings no advantages at all, since nothing else in your code is awaitable. (If you did want to do this, you'd use loop.run_in_exeuctor(), taking care not to re-create the pool as noted in this question.)

Here's some dummy code, which would need adapting to your classes:

from threading import Thread
from queue import Queue
from time import sleep
from random import randint


def process(task):
    print(task["name"])
    sleep(3 + randint(0, 100) / 100)
    print(f"Task {task['name']} done")


class WorkerThread(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
        print("Started worker thread")
        self._open = True

    def run(self):
        while self._open:
            task = self.queue.get()
            process(task)
            self.queue.task_done()

    def close(self):
        print("Closing", self)
        self._open = False


task_queue = Queue()
THREADS = 6
worker_threads = [WorkerThread(task_queue) for _ in range(THREADS)]
for worker in worker_threads:
    worker.setDaemon(True)
    worker.start()


print("Sending one task")
task_queue.put({"name": "Task 1"})
sleep(1)

print("Sending a bunch of tasks")
for i in range(1, 15):
    task_queue.put({"name": f"Task {i}"})
print("Sleeping for a bit")
sleep(2)

print("Shutting down")

# wrap this in your exit code
task_queue.join()  # wait for everything to be done
for worker in worker_threads:
    worker.close()

There are other approaches, but I think writing it explicitly is clearer here. Note that I have assumed your code is not cpu-bound, so it makes sense to use threads rather than processes.

Incidentally, this looks very like a minimal implementation of something like celery, which is probably overkill for your needs but might we interesting to look at all the same.

BTW, I don't read russian, but this looks like a fun project.

2e0byo
  • 5,305
  • 1
  • 6
  • 26