1

So I have this code which works great for reading messages out of predefined topics and printing it to screen. The rosbags come with a rosbag_name.db3 (sqlite) database and metadata.yaml file

from rosbags.rosbag2 import Reader as ROS2Reader
import sqlite3

from rosbags.serde import deserialize_cdr
import matplotlib.pyplot as plt
import os
import collections
import argparse
        
parser = argparse.ArgumentParser(description='Extract images from rosbag.')
# input will be the folder containing the .db3 and metadata.yml file
parser.add_argument('--input','-i',type=str, help='rosbag input location')
# run with python filename.py -i rosbag_dir/

args = parser.parse_args()

rosbag_dir = args.input

topic = "/topic/name"
frame_counter = 0

with ROS2Reader(rosbag_dir) as ros2_reader:

    ros2_conns = [x for x in ros2_reader.connections]
    # This prints a list of all topic names for sanity
    print([x.topic for x in ros2_conns])

    ros2_messages = ros2_reader.messages(connections=ros2_conns)
    
    for m, msg in enumerate(ros2_messages):
        (connection, timestamp, rawdata) = msg
        
        if (connection.topic == topic):
            print(connection.topic) # shows topic
            print(connection.msgtype) # shows message type
            print(type(connection.msgtype)) # shows it's of type string

            # TODO
            # this is where things crash when it's a custom message type
            data = deserialize_cdr(rawdata, connection.msgtype)
            print(data)

The issue is that I can't seem to figure out how to read in custom message types. deserialize_cdr takes a string for the message type field, but it's not clear to me how to replace this with a path or how to otherwise pass in a custom message.

Thanks

haxonek
  • 174
  • 1
  • 2
  • 17

1 Answers1

3

One approach would be that you declare and register it to the type system as a string:

from rosbags.typesys import get_types_from_msg, register_types

MY_CUSTOM_MSG = """
std_msgs/Header header
string foo        
"""

register_types(get_types_from_msg(
        MY_CUSTOM_MSG, 'my_custom_msgs/msg/MyCustomMsg'))

from rosbags.typesys.types import my_custom_msgs__msg__MyCustomMsg as MyCustomMsg

Next, using:

msg_type = MyCustomMsg.__msgtype__

you can get the message type that you can pass to deserialize_cdr. Also, see here for a quick example.

Another approach is to directly load it from the message definition. Essentially, you would need to read the message

from pathlib import Path
custom_msg_path = Path('/path/to/my_custom_msgs/msg/MyCustomMsg.msg')
msg_def = custom_msg_path.read_text(encoding='utf-8')

and then follow the same steps as above starting with get_types_from_msg(). A more detailed example of this approach is given here.

Lukas
  • 1,320
  • 12
  • 20