0

My goal is to run a program on different computers and get data from measurements. Problem is that those measurements need to happen at the same time (as much as possible). My current approach is to install crony to synchronize the time of all machines and start a python program that will take measurements at the start of each minute. This proved to be quite unwieldy as I have to poll the time.time() in a loop and check if we entered a new minute.

time_factor = 1   # 1000 if desired resolution is ms, 1 if seconds
interval_s = 60 * time_factor
old_interval = int(time.time()*time_factor) / interval_s
current_interval = None
running = True
while running:
    t_s = int(time.time() * time_factor)
    current_interval = t_s / interval_s

    if (t_s % interval_s == 0) and (current_interval != old_interval):
        request_runtime = time.time()

        await self.do_time_sensitive_work()

        old_interval = current_interval
        request_runtime = time.time() - request_runtime
        await asyncio.sleep(int(interval_s - request_runtime - 1))
    else:
        await asyncio.sleep(0.01)

Is there a standard solution for this type of a problem, or at least a more elegant solution?

  • Does this answer your question? [How to sleep until a specific time YYYY-MM-DD HH:MM:SS?](https://stackoverflow.com/questions/54173134/how-to-sleep-until-a-specific-time-yyyy-mm-dd-hhmmss) – quamrana Nov 05 '19 at 16:08
  • No, not really, the proposed solution is to calculate how long it would take until the next invocation from a relative point in time. I need to perform an action exactly on the minute, in as fine a resolution a CPU can provide. – Dario Fabijančić Nov 05 '19 at 16:20
  • Most operating systems have a way to schedule programs to be run at certain times (like crontab or Windows Task Scheduler). It would be more wieldy if you got rid of all the scheduling logic in python, and just used an OS tool for scheduling. – nog642 Nov 05 '19 at 16:48
  • @nog642 Your comment is good **general** advice, but does not apply for the OP's **stated use case**. Using an OS tool like cron means that Python would have to be started every time anew, which can take time, especially if the program imports complex third-party frameworks like asyncio, aiohttp, pandas, etc. The OP stated desire for starting the action "exactly on the minute, in as fine a resolution a CPU can provide", and combining cron with interpreter startup seems pretty far from that. – user4815162342 Nov 05 '19 at 18:06
  • @user4815162342 thank you very much for additional explanation of the problem, this is exactly the problem, I can do some processing before writing the results of the measurements and write them to queue in under a minute, but only if the process is already running. – Dario Fabijančić Nov 06 '19 at 10:13
  • The part I don't understand is why you don't just calculate the (floating-point) time interval between now and the desired moment, and `await asyncio.sleep(interval)`. Have you tried that? – user4815162342 Nov 06 '19 at 11:27
  • @user4815162342 well just sleeping a fixed amount of time will introduce a time drift, I want to check with the OS at every interval, make sure I'm still synchronized with the rest of the systems. – Dario Fabijančić Nov 06 '19 at 13:26
  • I could be wrong, but that sounds like you require more precision than systems like asyncio can deliver, really. Good luck! – user4815162342 Nov 06 '19 at 13:45

3 Answers3

0

Maybe this is what you are looking for

What is the best way to repeatedly execute a function every x seconds in Python?

Can't comment as I dont have the reputation T.T

Jason Chia
  • 1,144
  • 1
  • 5
  • 18
  • Sorry, I looked at that question, but sched library doesn't seem to have any implementation that would allow for starting a task at an exact time. And even if I handle that I don't know if it will not start drifting after some time (like time.sleep(60) will), I need to take measurements over a long period of time. – Dario Fabijančić Nov 06 '19 at 10:11
  • Wouldn't your problem be solved by using a listener thread that executes your program with a call from cron? The thread will be asynchronous and would be able to catch any sends. However if the processing takes >1 min i think you'd have to be able to implement multiprocessing to run the tasks in parallel in case of overlap. I think this approach would remove the startup costs as mentioned in the other comments – Jason Chia Nov 06 '19 at 11:49
  • Yes, a listener task in async loop listening to some signal a bash script would send from cron could be a solution. I somehow feel it would be more complicated than running my own scheduler. – Dario Fabijančić Nov 06 '19 at 13:03
0

Try this:

import asyncio
import time
from datetime import datetime


async def task(call_time):
    print(f"task started at: {to_string(call_time)}")
    await asyncio.sleep(5)
    print(f"task finished at: {to_string((time.time()))}")


def to_string(t):
    return datetime.utcfromtimestamp(t).strftime('%Y-%m-%d %H:%M:%S')


def call_task(loop, task, *args):
    print("task creating")
    loop.create_task(task(*args))
    print("task created")


async def main(loop: asyncio.AbstractEventLoop):
    interval = 10
    t = time.time()
    while True:
        print(f"Now is {to_string(t)}")
        call_time = estimate_next_call(t, interval)
        print(f"Call time {to_string(call_time)}")
        await asyncio.sleep(call_time - t)
        call_task(loop, task, call_time)
        t += interval
        await asyncio.sleep(interval)


def estimate_next_call(t, interval):
    return ((t + interval) // interval) * interval


if __name__ == "__main__":
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main(event_loop))
  • Welcome to Stackoverflow! Can you provide an explanation to supplement the code in your answer? How does this code solve the problem or how did you improve the code used in the question? – Kevin Nov 06 '19 at 17:59
  • I'm sorry this solution doesn't seem to take into account that the interval start must be exactly at the beginning of whatever is chosen as an interval e.g. per minute code must detect exactly the start of a minute. Also it doesn't include any sort of mechanism for avoiding the drifting of time that would be introduced by simply sleeping for x seconds. But thanks. – Dario Fabijančić Nov 07 '19 at 12:31
0

I tried using apscheduler library as it seems to have a cron expression implemented, however i found 2 downsides with this approach. One would be that the library integrates with asyncio code pretty weirdly, must be the fault of my implementation. Second issue is that it takes a couple of milliseconds for the scheduled task to actually start running, it's a minor issue, however scheduling by hand avoids this lag, and I want to get measurements as close as possible to "exact time"

I also tried using celery beat, this however seems like a major overkill, having to install Redis or something else as a message broker, and using such a complex framework for such a minor task simply screams "wrong".

Current solution that works fast enough relies on modifying the resolution of time checks using "time_factor" to modify time.time() values in seconds. Main part of solution is dividing the current time with the interval in order to get the cordial number of the interval in history, if this number has changed we just entered the new interval:

time_factor = 100  # 1000 for milliseconds
interval_size = 60 * time_factor
sleep_time = 1 / (time_factor * 100)
old_interval = int(int(time.time()*time_factor) / interval_size)
current_timestamp = time.time()
current_interval = None

running = True
while running:
    current_timestamp = int(time.time() * time_factor)
    current_interval = int(t_s / interval_size)
    if (current_interval != old_interval):
        request_runtime = time.time()

        await self.do_time_sensitive_work()

        old_interval = current_interval
        request_runtime = time.time() - request_runtime
        await asyncio.sleep(1/(time_factor*100) - request_runtime )
    else:
        await asyncio.sleep(1/(time_factor*100))