You are correct about callbacks being a way of providing "flexibilty" but it has really nothing to do with "speed". The typical use case for a callback is the situation where function 1 is calling function 2 that is performing some function whose completion occurs asynchronously so that function 2 will be returning more or less immediately back but function 1 still needs to arrange for a notification to occur when that asynchronous completion occurs. Therefore, function 1 passes to function 2 a callback function that will be invoked with agreed-upon arguments when that completion occurs.
In your case your asynchronous event is the expiration of a time interval and your callback function is passed what that delay time interval was. Now it turns out that Python comes with a sched.scheduler
class that allows you to schedule "events" to be run in the future by either specifying an absolute time value or a delay, which will be added to the current time to compute the absolute time at which the event is to be run. This event is just a callback function to which you can specify any arguments you wish. The problem with this class, in my opinion, is that you have to first enter all the events that you will want to be running and then call a run
method that will block until all the events are run. Much better would be to just specify a future event you want to run (that is, a callback) and continue without blocking and this event will run asynchronously in another thread. And so I have heavily modified the sched.scheduler
class to create a Scheduler
class. Your code would then look like:
import time
from scheduler import Scheduler
def callbackFunc(msg_no, delay):
print(f"callback: message number {msg_no}, delay {delay} at {time.time()}")
def saysomething(msg_no, delay, callback):
print(f"saysomething: message {msg_no}, delay {delay} at {time.time()}")
scheduler.enter(delay, callbackFunc, args=(msg_no, delay,))
time.sleep(2)
if __name__ == '__main__':
scheduler = Scheduler()
saysomething(1, 1, callbackFunc)
saysomething(2, 2, callbackFunc)
saysomething(3, 3, callbackFunc)
Prints:
saysomething: message 1, delay 1 at 1644584120.865646
callback: message number 1, delay 1 at 1644584121.8778687
saysomething: message 2, delay 2 at 1644584122.8747876
saysomething: message 3, delay 3 at 1644584124.8790839
callback: message number 2, delay 2 at 1644584124.8790839
callback: message number 3, delay 3 at 1644584127.9029477
And the Scheduler
class:
"""
Modified sched.schedule class.
"""
import time
import heapq
from collections import namedtuple
import threading
class Event(namedtuple('Event', 'start_time, priority, action, args, kwargs')):
__slots__ = []
def __eq__(s, o): return (s.start_time, s.priority) == (o.start_time, o.priority)
def __lt__(s, o): return (s.start_time, s.priority) < (o.start_time, o.priority)
def __le__(s, o): return (s.start_time, s.priority) <= (o.start_time, o.priority)
def __gt__(s, o): return (s.start_time, s.priority) > (o.start_time, o.priority)
def __ge__(s, o): return (s.start_time, s.priority) >= (o.start_time, o.priority)
Event.start_time.__doc__ = ('''Numeric type compatible with the return value from time.monotonic.''')
Event.priority.__doc__ = ('''Events scheduled for the same time will be executed
in the order of their priority.''')
Event.action.__doc__ = ('''Executing the event means executing
action(*args, **kwargs)''')
Event.args.__doc__ = ('''args is a sequence holding the positional
arguments for the action.''')
Event.kwargs.__doc__ = ('''kwargs is a dictionary holding the keyword
arguments for the action.''')
_sentinel = object()
class Scheduler:
def __init__(self, daemon=False):
"""
Initialize a new instance.
If daemon is True, the scheduler thread will run as a daemon so it will be possible
for the main thread to terminate with scheduled events yet to run.
Regardless of how the daemon argument is set, when a new event is added a new
scheduler thread will be started if the previous thread has terminated.
"""
self._queue = []
self._daemon=daemon
self._running = False
self._got_event = threading.Condition()
self._queue_exhausted = threading.Event()
self._queue_exhausted.set()
self._thread = None
def __del__(self):
if not self._daemon and self._thread:
self._thread.join()
def enterabs(self, start_time, action, args=(), kwargs=_sentinel, priority=1):
"""Enter a new event in the queue at an absolute time.
Returns an ID for the event which can be used to remove it,
if necessary.
"""
if kwargs is _sentinel:
kwargs = {}
event = Event(start_time, priority, action, args, kwargs)
with self._got_event:
heapq.heappush(self._queue, event)
self._queue_exhausted.clear()
if not self._running:
if self._thread:
self._thread.join() # tidy up
self._running = True
self._thread = threading.Thread(target=self._run, daemon=self._daemon).start()
else:
self._got_event.notify()
return event # The ID
def enter(self, delay, action, args=(), kwargs=_sentinel, priority=1):
"""A variant that specifies the time as a relative time.
This is actually the more commonly used interface.
"""
start_time = time.monotonic() + delay
return self.enterabs(start_time, action, args, kwargs, priority)
def cancel(self, event):
"""Remove an event from the queue.
This must be presented the ID as returned by enter().
If the event is not in the queue, this raises ValueError.
"""
with self._got_event:
self._queue.remove(event)
heapq.heapify(self._queue)
def empty(self):
"""Check whether the queue is empty."""
with self._got_event:
return not self._queue
def running(self):
"""Check whether the scheduler is running."""
with self._got_event:
return self._running
def _run(self):
"""Execute events until the queue is empty."""
# localize variable access to minimize overhead
# and to improve thread safety
got_event = self._got_event
q = self._queue
delayfunc = time.sleep
timefunc = time.monotonic
pop = heapq.heappop
queue_exhausted = self._queue_exhausted
while True:
try:
while True:
with got_event:
if not q:
self._running = False
queue_exhausted.set()
return
start_time, priority, action, args, kwargs = q[0]
now = timefunc()
if start_time > now:
# Wait for either the time to elapse or a new Event to be added:
got_event.wait(timeout=(start_time - now))
continue
pop(q)
action(*args, **kwargs)
delayfunc(0) # Let other threads run
except:
pass
@property
def queue(self):
"""An ordered list of upcoming events.
Events are named tuples with fields for:
start_time, priority, action, argss, kwargs
"""
# Use heapq to sort the queue rather than using 'sorted(self._queue)'.
# With heapq, two events scheduled at the same time will show in
# the actual order they would be retrieved.
with self._got_event:
events = self._queue[:]
return list(map(heapq.heappop, [events]*len(events)))
def wait_for_queue_empty(self):
"""Wait for the queue to become empty."""
return self._queue_exhausted.wait()