1

In a program that I am writing, I need to run a function for a set amount of time and after that time continues running the main code of the program in the function I am waiting for input (an NFC card to be scanned) and I want the program to stop waiting after a set amount of time.

I have tried using pytimedinput, but this only works for normal inputs as well as also trying subprocess (putting this part of code in a different file) however this did not work on my raspberry pi where the code will be running.

Any help will be much appreciated.

3 Answers3

0

I would use something similar to a set timeout method used in javascript.

I believe you will find your answer in this previous post: Postpone code for later execution in python (like setTimeout in javascript)

Pad
  • 164
  • 4
0

Here is a sample solution for your reference. You can use the decorator here to decorate any function that you want to put a timeout on.

from datetime import datetime, timezone
import multiprocessing
import time


def timeout(seconds):
    def decorator(func):
        def wrapper(*args, **kwargs):
            response = multiprocessing.Manager().dict()

            proc = multiprocessing.Process(target=func, args=(*args, response), kwargs=kwargs)
            proc.start()

            start = time.time()
            while (time.time() - start) < seconds and proc.is_alive():
                pass
            if proc.is_alive():
                print(f"Process is not finished after {seconds} seconds. Terminating...")
                proc.terminate()

            proc.join()
            return dict(response)

        return wrapper

    return decorator


@timeout(10)
def scan_nfc_card(card_name, response):
    # If you need to return anything from the function with a timeout, use the response dictionary.
    response['counter'] = 0
    while True:
        response['counter'] += 1
        print(f"Waiting for NFC card {card_name} to be scanned...")
        time.sleep(1)


print(f"[{datetime.now(timezone.utc)}] Start first process")
response = scan_nfc_card("card-ABC")
print(f"{response=}")
print(f"[{datetime.now(timezone.utc)}] Continue other processes")

Output:

$ python3 script.py 
[2021-09-02 10:23:50.475114+00:00] Start first process
Waiting for NFC card card-ABC to be scanned...
Waiting for NFC card card-ABC to be scanned...
Waiting for NFC card card-ABC to be scanned...
Waiting for NFC card card-ABC to be scanned...
Waiting for NFC card card-ABC to be scanned...
Waiting for NFC card card-ABC to be scanned...
Waiting for NFC card card-ABC to be scanned...
Waiting for NFC card card-ABC to be scanned...
Waiting for NFC card card-ABC to be scanned...
Waiting for NFC card card-ABC to be scanned...
Process is not finished after 10 seconds. Terminating...
response={'counter': 10}
[2021-09-02 10:24:00.502205+00:00] Continue other processes
  • As you can see, the function reached the timeout of 10 seconds.

Let's say the NFC scan was successful right away

@timeout(10)
def scan_nfc_card(card_name, response):
    response['counter'] = 0
    while True:
        response['counter'] += 1
        print("Waiting for NFC card to be scanned...")
        break  # Break right away

Output:

$ python3 script.py 
[2021-09-02 10:23:43.343436+00:00] Start first process
Waiting for NFC card card-ABC to be scanned...
response={'counter': 1}
[2021-09-02 10:23:43.385748+00:00] Continue other processes
  • Now, the function ended right away.
0

This is a one way:

import multiprocessing
import time

def f():
    while 1:
        print(".")
        time.sleep(1)

if __name__ == "__main__":
    p = multiprocessing.Process(target=f)
    p.start()  # exits immediately leaving the process running
    p.join(timeout=3)
    if p.is_alive():
        p.kill()

    print("Bye")

Guarding the main code with if __name__ == "__main__": is important, see Programming guidelines for multiprocessing.

Here there is an example on how to get a data from the spawned process:

import multiprocessing
import time

def f(q):
    while 1:
        q.put(".")
        time.sleep(1)

if __name__ == "__main__":
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=f, args=(q,))
    p.start()  # exits immediately leaving the process running
    p.join(timeout=3)
    if p.is_alive():
        p.kill()
    print("Queue content: ")
    while not q.empty():
        print(q.get())

    print("Bye")
Roman Pavelka
  • 3,736
  • 2
  • 11
  • 28