1

I'm trying to call an asyncio async function from a ROS2 callback as seen in the code below. The callback always throws an error 'RuntimeError: await wasn't used with future' and I can't figure out why. It doesn't seem to have this error when awaiting a custom async function that doesn't await anything itself (see 'test_async()').

Minimal example

Subscriber - foo

import asyncio, rclpy

from rclpy.node import Node
from std_msgs.msg import Bool
from concurrent.futures import ThreadPoolExecutor

import rclpy.qos as qos
from rclpy.qos import QoSProfile


class Foo(Node):
    def __init__(self):
        super().__init__('foo')

        # Setup test subscriber
        qos_profile = QoSProfile(
            reliability=qos.ReliabilityPolicy.BEST_EFFORT,
            durability=qos.DurabilityPolicy.TRANSIENT_LOCAL,
            history=qos.HistoryPolicy.KEEP_LAST,
            depth=1
        )

        self.test_sub = self.create_subscription(
            Bool,
            '/foo_sub',
            self.clbk,
            qos_profile)
        

    # Test callback function
    async def clbk(self, msg):
        self.get_logger().info(f'Received: {msg.data}. About to test async')
        
        # This doesn't cause a problem
        await self.test_async()

        # This doesn't work
        self.get_logger().info('About to sleep')
        await asyncio.sleep(3)

        # Workaround: This appears to run sleep in a separate thread.
        # executor = ThreadPoolExecutor(max_workers=1)
        # asyncio.get_event_loop().run_in_executor(executor, asyncio.run, asyncio.sleep(3))
        # executor.shutdown(wait=True)

        # The workaround doesn't work when getting a returned value
        # executor = ThreadPoolExecutor(max_workers=1)
        # asyncio.get_event_loop().run_in_executor(executor, asyncio.run, self.sleep_with_return())
        # executor.shutdown(wait=True)

        self.get_logger().info('Clbk complete')


    # Working async function
    async def test_async(self):
        self.get_logger().info('Test async works!')


    # Workaround failure case
    async def sleep_with_return(self):
        await asyncio.sleep(3)

        return True


async def async_main():
    rclpy.init()

    # Create node and spin
    foo = Foo()
    rclpy.spin(foo)

def main():
    asyncio.run(async_main())

if __name__ == '__main__':
    main()

Publisher - bar

import asyncio, rclpy

from rclpy.node import Node
from std_msgs.msg import Bool

import rclpy.qos as qos
from rclpy.qos import QoSProfile


class Bar(Node):
    def __init__(self):
        super().__init__('bar')

        # Setup test publisher
        qos_profile = QoSProfile(
            reliability=qos.ReliabilityPolicy.BEST_EFFORT,
            durability=qos.DurabilityPolicy.TRANSIENT_LOCAL,
            history=qos.HistoryPolicy.KEEP_LAST,
            depth=1
        )

        self.test_pub = self.create_publisher(Bool, '/foo_sub', qos_profile)

    def send_msg(self):
        msg = Bool()
        msg.data = True

        self.test_pub.publish(msg)

    
def main():
    rclpy.init()

    # Create node
    bar = Bar()

    # Send messages
    while True:
        input('Press enter when you want to send msg')
        bar.send_msg()

if __name__ == '__main__':
    main()

Output After running both nodes and sending a msg from bar (by pressing 'enter'), this is foo's error message:

[INFO] [1674748995.662302006] [foo]: Received: True. About to test async
[INFO] [1674748995.662621572] [foo]: Test async works!
[INFO] [1674748995.662859403] [foo]: About to sleep
Traceback (most recent call last):
  File "/home/harvey/px4_ros_com_ros2/install/swarm_load_carry/lib/swarm_load_carry/foo", line 33, in <module>
    sys.exit(load_entry_point('swarm-load-carry==0.0.0', 'console_scripts', 'foo')())
  File "/home/harvey/px4_ros_com_ros2/install/swarm_load_carry/lib/python3.10/site-packages/swarm_load_carry/foo.py", line 74, in main
    asyncio.run(async_main())
  File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/harvey/px4_ros_com_ros2/install/swarm_load_carry/lib/python3.10/site-packages/swarm_load_carry/foo.py", line 71, in async_main
    rclpy.spin(foo)
  File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/__init__.py", line 222, in spin
    executor.spin_once()
  File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 713, in spin_once
    raise handler.exception()
  File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/task.py", line 239, in __call__
    self._handler.send(None)
  File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 418, in handler
    await call_coroutine(entity, arg)
  File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 343, in _execute_subscription
    await await_or_execute(sub.callback, msg)
  File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 104, in await_or_execute
    return await callback(*args)
  File "/home/harvey/px4_ros_com_ros2/install/swarm_load_carry/lib/python3.10/site-packages/swarm_load_carry/foo.py", line 39, in clbk
    await asyncio.sleep(3)
  File "/usr/lib/python3.10/asyncio/tasks.py", line 605, in sleep
    return await future
RuntimeError: await wasn't used with future
[ros2run]: Process exited with failure 1

As you can see in the commented out section in 'clbk', I have found a workaround by running asyncio.run in a separate thread. This is unideal however as it makes the code more complex and I can't retrieve return values (say if asyncio.sleep(3) actually returned something).

As this works, my guess is it might have something to do with asyncio not being threadsafe and ros2 callbacks running in a different thread (I can't seem to find if this is true), or something to do with where the asyncio event loop is running... I've tried may other workarounds based on this assumption however (such as getting the event loop, using call_soon_threadsafe, setting new event loops) and none seem to work.

Running process

  • Each node 'foo' and 'bar' are run in their own terminals with 'ros2 run pkg module' where pkg (swarm_load_carry) is the name of the ros2 package in a colcon workspace and module is either foo or bar. I am confident that the workspace, package and launchfiles are set up correctly as they work with other test cases.

System details

  • Ubuntu 22.04.1
  • Python 3.10
  • Ros2 Humble
Merty
  • 11
  • 2
  • I might be wrong, but if you create another thread when you use async, who is going to take care of it? I think you should take a look at how to use `spin()` for a [multithreaded case](https://stackoverflow.com/questions/71322517/how-to-use-multi-threading-in-python-with-ros-services-and-serial-comunication), for example. – ignacio Jan 26 '23 at 21:20
  • Does this [Sync-Vs-Async Tutorial](https://docs.ros.org/en/humble/How-To-Guides/Sync-Vs-Async.html) helps in anyway? – ignacio Jan 26 '23 at 21:27
  • I don't really get what you are trying to achieve with that code, as the callback itself already runs in another thread than the main. – ignacio Jan 26 '23 at 21:47
  • @ignacio a use case might be using a lib designed around asyncio (Mavsdk-python for example) with ros2. – numan Jun 12 '23 at 12:43
  • ok, after alittile bit digging, find [this](https://github.com/ros2/rclpy/issues/279) issue. in there there is a link to a [test case](https://github.com/ros2/rclpy/blob/b72a05bd3fb3ac7d27a8da359571afbbfec07f19/rclpy/test/test_executor.py#L154) as I understand this is a known issue and there is a workaround (or rclpy way of doing things) for it. I will let you know if I understand it comletely. Anyone is welcome to take a look at the links meanwhile of course – numan Jun 12 '23 at 13:26

0 Answers0