3

Background: To me it seems clear, that the conecpt of callbacks is flexible, but I also thought, it makes code much faster. However, the following example works, but it cannot show that time can be saved using a callback Ref1:

import time
from time import sleep

def callbackFunc(delay):
  time.sleep(delay)
  print("callback:     message 3 delay " + str(delay))

def saysomething(delay, callback):
  print("saysomething: message 2 delay " + str(delay))
  callback(delay) # hier muss ich schon wissen, dass nur "delay" benötigt wird...
  time.sleep(2)

if __name__ == '__main__':
  t0 = time.time()
  print("main:         message 1.")
  saysomething(2, callbackFunc)
  print("main:         message 4.")
  print("\ntime: ",time.time() - t0)

Output

main:         message 1.
saysomething: message 2 delay 2
callback:     message 3 delay 2
main:         message 4.
time:  4.01

So how can I achive something like this

main:         message 1.
saysomething: message 2 delay 2
callback:     message 3 delay 2
main:         message 4.
time:  2  !!!!!!!!!!!!!

Perhaps it was even possible to switch the order of messages 3 & 4? Or do I get anything wrong?


Perhaps, these answers here and here and the following code from here which does not use callbacks but shows asynchronous behavior help?

Christoph
  • 6,841
  • 4
  • 37
  • 89

3 Answers3

6

Re-establishing the premise

Before we start, let's clarify the statement in your post:
A callback does not make the code itself faster. The program does end earlier because the function that accepts the callback does not block.

Also, callbacks are generally executed synchronously in the function that accepts the callback, so your example would still take 4 seconds. A more reasonable starting point would be:

import time

def callbackFunc(delay):
    # time.sleep(delay)  # -
    print("callback:     message 3 delay " + str(delay))

def saysomething(delay, callback):
    print("saysomething: message 2 delay " + str(delay))
    time.sleep(2)    # +
    callback(delay)
    # time.sleep(2)  # -

if __name__ == '__main__':
    t0 = time.time()
    print("main:         message 1.")
    saysomething(2, callbackFunc)
    print("\ntime: ", time.time() - t0, "\n")  # +
    saysomething(2, callbackFunc)              # +
    print("\ntime: ", time.time() - t0, "\n")  # +
    print("main:         message 4.")
    print("\ntime: ", time.time() - t0)

Output:

main:         message 1.
saysomething: message 2 delay 2
callback:     message 3 delay 2

time:  2.00

saysomething: message 2 delay 2
callback:     message 3 delay 2

time:  4.00

main:         message 4.

time:  4.00

Why the example doesn't work

sleep() suspends execution of the calling thread, so you can't call it on the main thread if you want to show asynchronous behaviour.

You can use threads or an event loop to show asynchronous behaviour.

An example using an event loop

Here's an example using the built-in event loop:

import asyncio  # +
import time

def callbackFunc(delay):
    print("callback:     message 3 delay " + str(delay))

def saysomething(delay, callback):
    print("saysomething: message 2 delay " + str(delay))
    # time.sleep(2)                                          # -
    # callback(delay)                                        # -
    asyncio.get_event_loop().call_later(2, callback, delay)  # +

def wait_for_callbacks():  # +
    def run_until_complete(loop):
        loop.call_soon(lambda: run_until_complete(loop) if loop._scheduled else loop.stop())
    loop = asyncio.get_event_loop()
    run_until_complete(loop)
    loop.run_forever()

if __name__ == '__main__':
    t0 = time.time()
    print("main:         message 1.")
    saysomething(2, callbackFunc)
    print("\ntime: ", time.time() - t0, "\n")
    saysomething(2, callbackFunc)
    print("\ntime: ", time.time() - t0, "\n")
    print("main:         message 4.")
    wait_for_callbacks()  # +
    print("\ntime: ", time.time() - t0)

Output:

main:         message 1.
saysomething: message 2 delay 2

time:  0.000

saysomething: message 2 delay 2

time:  0.000

main:         message 4.
callback:     message 3 delay 2
callback:     message 3 delay 2

time:  2.000

An example using threads

Here's an example using the built-in OS threads:

import threading  # +
import time

def threadify(func):  # +
    def _func(*args, **kwargs):
        thread = threading.Thread(target=func, args=args, kwargs=kwargs)
        thread.start()
        return thread
    return _func

def callbackFunc(delay):
    print("callback:     message 3 delay " + str(delay))

@threadify  # +
def saysomething(delay, callback):
    print("saysomething: message 2 delay " + str(delay))
    time.sleep(2)
    callback(delay)

def wait_for_callbacks():  # +
    for thread in threading.enumerate():
        if thread is not threading.current_thread():
            thread.join()

if __name__ == '__main__':
    t0 = time.time()
    print("main:         message 1.")
    saysomething(2, callbackFunc)
    print("\ntime: ", time.time() - t0, "\n")
    saysomething(2, callbackFunc)
    print("\ntime: ", time.time() - t0, "\n")
    print("main:         message 4.")
    wait_for_callbacks()  # +
    print("\ntime: ", time.time() - t0)

Output: (marginally slower than using the built-in event loop due to overhead of OS threads)

main:         message 1.
saysomething: message 2 delay 2

time:  0.000

saysomething: message 2 delay 2

time:  0.000

main:         message 4.
callback:     message 3 delay 2
callback:     message 3 delay 2

time:  2.00
aaron
  • 39,695
  • 6
  • 46
  • 102
2

You are correct about callbacks being a way of providing "flexibilty" but it has really nothing to do with "speed". The typical use case for a callback is the situation where function 1 is calling function 2 that is performing some function whose completion occurs asynchronously so that function 2 will be returning more or less immediately back but function 1 still needs to arrange for a notification to occur when that asynchronous completion occurs. Therefore, function 1 passes to function 2 a callback function that will be invoked with agreed-upon arguments when that completion occurs.

In your case your asynchronous event is the expiration of a time interval and your callback function is passed what that delay time interval was. Now it turns out that Python comes with a sched.scheduler class that allows you to schedule "events" to be run in the future by either specifying an absolute time value or a delay, which will be added to the current time to compute the absolute time at which the event is to be run. This event is just a callback function to which you can specify any arguments you wish. The problem with this class, in my opinion, is that you have to first enter all the events that you will want to be running and then call a run method that will block until all the events are run. Much better would be to just specify a future event you want to run (that is, a callback) and continue without blocking and this event will run asynchronously in another thread. And so I have heavily modified the sched.scheduler class to create a Scheduler class. Your code would then look like:

import time
from scheduler import Scheduler

def callbackFunc(msg_no, delay):
    print(f"callback: message number {msg_no}, delay {delay} at {time.time()}")

def saysomething(msg_no, delay, callback):
    print(f"saysomething: message {msg_no}, delay {delay} at {time.time()}")
    scheduler.enter(delay, callbackFunc, args=(msg_no, delay,))
    time.sleep(2)

if __name__ == '__main__':
    scheduler = Scheduler()
    saysomething(1, 1, callbackFunc)
    saysomething(2, 2, callbackFunc)
    saysomething(3, 3, callbackFunc)

Prints:

saysomething: message 1, delay 1 at 1644584120.865646
callback: message number 1, delay 1 at 1644584121.8778687
saysomething: message 2, delay 2 at 1644584122.8747876
saysomething: message 3, delay 3 at 1644584124.8790839
callback: message number 2, delay 2 at 1644584124.8790839
callback: message number 3, delay 3 at 1644584127.9029477

And the Scheduler class:

"""
Modified sched.schedule class.
"""

import time
import heapq
from collections import namedtuple
import threading


class Event(namedtuple('Event', 'start_time, priority, action, args, kwargs')):
    __slots__ = []
    def __eq__(s, o): return (s.start_time, s.priority) == (o.start_time, o.priority)
    def __lt__(s, o): return (s.start_time, s.priority) <  (o.start_time, o.priority)
    def __le__(s, o): return (s.start_time, s.priority) <= (o.start_time, o.priority)
    def __gt__(s, o): return (s.start_time, s.priority) >  (o.start_time, o.priority)
    def __ge__(s, o): return (s.start_time, s.priority) >= (o.start_time, o.priority)

Event.start_time.__doc__ = ('''Numeric type compatible with the return value from time.monotonic.''')
Event.priority.__doc__ = ('''Events scheduled for the same time will be executed
in the order of their priority.''')
Event.action.__doc__ = ('''Executing the event means executing
action(*args, **kwargs)''')
Event.args.__doc__ = ('''args is a sequence holding the positional
arguments for the action.''')
Event.kwargs.__doc__ = ('''kwargs is a dictionary holding the keyword
arguments for the action.''')

_sentinel = object()

class Scheduler:
    def __init__(self, daemon=False):
        """
        Initialize a new instance.
        If daemon is True, the scheduler thread will run as a daemon so it will be possible
        for the main thread to terminate with scheduled events yet to run.
        Regardless of how the daemon argument is set, when a new event is added a new
        scheduler thread will be started if the previous thread has terminated.
        """
        self._queue = []
        self._daemon=daemon
        self._running = False
        self._got_event = threading.Condition()
        self._queue_exhausted = threading.Event()
        self._queue_exhausted.set()
        self._thread = None

    def __del__(self):
        if not self._daemon and self._thread:
            self._thread.join()

    def enterabs(self, start_time, action, args=(), kwargs=_sentinel, priority=1):
        """Enter a new event in the queue at an absolute time.

        Returns an ID for the event which can be used to remove it,
        if necessary.

        """
        if kwargs is _sentinel:
            kwargs = {}
        event = Event(start_time, priority, action, args, kwargs)
        with self._got_event:
            heapq.heappush(self._queue, event)
            self._queue_exhausted.clear()
            if not self._running:
                if self._thread:
                    self._thread.join() # tidy up
                self._running = True
                self._thread = threading.Thread(target=self._run, daemon=self._daemon).start()
            else:
                self._got_event.notify()

        return event # The ID

    def enter(self, delay, action, args=(), kwargs=_sentinel, priority=1):
        """A variant that specifies the time as a relative time.

        This is actually the more commonly used interface.

        """
        start_time = time.monotonic() + delay
        return self.enterabs(start_time, action, args, kwargs, priority)

    def cancel(self, event):
        """Remove an event from the queue.

        This must be presented the ID as returned by enter().
        If the event is not in the queue, this raises ValueError.

        """
        with self._got_event:
            self._queue.remove(event)
            heapq.heapify(self._queue)

    def empty(self):
        """Check whether the queue is empty."""
        with self._got_event:
            return not self._queue

    def running(self):
        """Check whether the scheduler is running."""
        with self._got_event:
            return self._running

    def _run(self):
        """Execute events until the queue is empty."""

        # localize variable access to minimize overhead
        # and to improve thread safety
        got_event = self._got_event
        q = self._queue
        delayfunc = time.sleep
        timefunc = time.monotonic
        pop = heapq.heappop
        queue_exhausted = self._queue_exhausted
        while True:
            try:
                while True:
                    with got_event:
                        if not q:
                            self._running = False
                            queue_exhausted.set()
                            return
                        start_time, priority, action, args, kwargs = q[0]
                        now = timefunc()
                        if start_time > now:
                            # Wait for either the time to elapse or a new Event to be added:
                            got_event.wait(timeout=(start_time - now))
                            continue
                        pop(q)
                    action(*args, **kwargs)
                    delayfunc(0)   # Let other threads run
            except:
                pass

    @property
    def queue(self):
        """An ordered list of upcoming events.

        Events are named tuples with fields for:
            start_time, priority, action, argss, kwargs

        """
        # Use heapq to sort the queue rather than using 'sorted(self._queue)'.
        # With heapq, two events scheduled at the same time will show in
        # the actual order they would be retrieved.
        with self._got_event:
            events = self._queue[:]
        return list(map(heapq.heappop, [events]*len(events)))

    def wait_for_queue_empty(self):
        """Wait for the queue to become empty."""
        return self._queue_exhausted.wait()
Booboo
  • 38,656
  • 3
  • 37
  • 60
1

This is irrelevant w.r.t. the SWIG callbacks used by the routing library.

Laurent Perron
  • 8,594
  • 1
  • 8
  • 22
  • That seems to be more a comment, that my guess "makes the solver much faster" is wrong? Could you explain what you mean? (I did NOT downvote...) – Christoph Feb 08 '22 at 13:29
  • 1
    these are not async callbacks. This is synchronous call from the C++ layer to the Python code through the SWIG layer. – Laurent Perron Feb 08 '22 at 13:59
  • OK, thank you for your comment. FYI: I just deleted Google OR-tools from the question because it was only secondary context and might be misleading to what I am looking for. But thanks for your clarification concerning OR-tools :-) – Christoph Feb 08 '22 at 14:38