I'm trying to call an asyncio async function from a ROS2 callback as seen in the code below. The callback always throws an error 'RuntimeError: await wasn't used with future' and I can't figure out why. It doesn't seem to have this error when awaiting a custom async function that doesn't await anything itself (see 'test_async()').
Minimal example
Subscriber - foo
import asyncio, rclpy
from rclpy.node import Node
from std_msgs.msg import Bool
from concurrent.futures import ThreadPoolExecutor
import rclpy.qos as qos
from rclpy.qos import QoSProfile
class Foo(Node):
def __init__(self):
super().__init__('foo')
# Setup test subscriber
qos_profile = QoSProfile(
reliability=qos.ReliabilityPolicy.BEST_EFFORT,
durability=qos.DurabilityPolicy.TRANSIENT_LOCAL,
history=qos.HistoryPolicy.KEEP_LAST,
depth=1
)
self.test_sub = self.create_subscription(
Bool,
'/foo_sub',
self.clbk,
qos_profile)
# Test callback function
async def clbk(self, msg):
self.get_logger().info(f'Received: {msg.data}. About to test async')
# This doesn't cause a problem
await self.test_async()
# This doesn't work
self.get_logger().info('About to sleep')
await asyncio.sleep(3)
# Workaround: This appears to run sleep in a separate thread.
# executor = ThreadPoolExecutor(max_workers=1)
# asyncio.get_event_loop().run_in_executor(executor, asyncio.run, asyncio.sleep(3))
# executor.shutdown(wait=True)
# The workaround doesn't work when getting a returned value
# executor = ThreadPoolExecutor(max_workers=1)
# asyncio.get_event_loop().run_in_executor(executor, asyncio.run, self.sleep_with_return())
# executor.shutdown(wait=True)
self.get_logger().info('Clbk complete')
# Working async function
async def test_async(self):
self.get_logger().info('Test async works!')
# Workaround failure case
async def sleep_with_return(self):
await asyncio.sleep(3)
return True
async def async_main():
rclpy.init()
# Create node and spin
foo = Foo()
rclpy.spin(foo)
def main():
asyncio.run(async_main())
if __name__ == '__main__':
main()
Publisher - bar
import asyncio, rclpy
from rclpy.node import Node
from std_msgs.msg import Bool
import rclpy.qos as qos
from rclpy.qos import QoSProfile
class Bar(Node):
def __init__(self):
super().__init__('bar')
# Setup test publisher
qos_profile = QoSProfile(
reliability=qos.ReliabilityPolicy.BEST_EFFORT,
durability=qos.DurabilityPolicy.TRANSIENT_LOCAL,
history=qos.HistoryPolicy.KEEP_LAST,
depth=1
)
self.test_pub = self.create_publisher(Bool, '/foo_sub', qos_profile)
def send_msg(self):
msg = Bool()
msg.data = True
self.test_pub.publish(msg)
def main():
rclpy.init()
# Create node
bar = Bar()
# Send messages
while True:
input('Press enter when you want to send msg')
bar.send_msg()
if __name__ == '__main__':
main()
Output After running both nodes and sending a msg from bar (by pressing 'enter'), this is foo's error message:
[INFO] [1674748995.662302006] [foo]: Received: True. About to test async
[INFO] [1674748995.662621572] [foo]: Test async works!
[INFO] [1674748995.662859403] [foo]: About to sleep
Traceback (most recent call last):
File "/home/harvey/px4_ros_com_ros2/install/swarm_load_carry/lib/swarm_load_carry/foo", line 33, in <module>
sys.exit(load_entry_point('swarm-load-carry==0.0.0', 'console_scripts', 'foo')())
File "/home/harvey/px4_ros_com_ros2/install/swarm_load_carry/lib/python3.10/site-packages/swarm_load_carry/foo.py", line 74, in main
asyncio.run(async_main())
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/home/harvey/px4_ros_com_ros2/install/swarm_load_carry/lib/python3.10/site-packages/swarm_load_carry/foo.py", line 71, in async_main
rclpy.spin(foo)
File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/__init__.py", line 222, in spin
executor.spin_once()
File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 713, in spin_once
raise handler.exception()
File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/task.py", line 239, in __call__
self._handler.send(None)
File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 418, in handler
await call_coroutine(entity, arg)
File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 343, in _execute_subscription
await await_or_execute(sub.callback, msg)
File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 104, in await_or_execute
return await callback(*args)
File "/home/harvey/px4_ros_com_ros2/install/swarm_load_carry/lib/python3.10/site-packages/swarm_load_carry/foo.py", line 39, in clbk
await asyncio.sleep(3)
File "/usr/lib/python3.10/asyncio/tasks.py", line 605, in sleep
return await future
RuntimeError: await wasn't used with future
[ros2run]: Process exited with failure 1
As you can see in the commented out section in 'clbk', I have found a workaround by running asyncio.run in a separate thread. This is unideal however as it makes the code more complex and I can't retrieve return values (say if asyncio.sleep(3) actually returned something).
As this works, my guess is it might have something to do with asyncio not being threadsafe and ros2 callbacks running in a different thread (I can't seem to find if this is true), or something to do with where the asyncio event loop is running... I've tried may other workarounds based on this assumption however (such as getting the event loop, using call_soon_threadsafe, setting new event loops) and none seem to work.
Running process
- Each node 'foo' and 'bar' are run in their own terminals with 'ros2 run pkg module' where pkg (swarm_load_carry) is the name of the ros2 package in a colcon workspace and module is either foo or bar. I am confident that the workspace, package and launchfiles are set up correctly as they work with other test cases.
System details
- Ubuntu 22.04.1
- Python 3.10
- Ros2 Humble