pythonros2rosbag

How to read custom message type using ros2bag


So I have this code which works great for reading messages out of predefined topics and printing it to screen. The rosbags come with a rosbag_name.db3 (sqlite) database and metadata.yaml file

from rosbags.rosbag2 import Reader as ROS2Reader
import sqlite3

from rosbags.serde import deserialize_cdr
import matplotlib.pyplot as plt
import os
import collections
import argparse
        
parser = argparse.ArgumentParser(description='Extract images from rosbag.')
# input will be the folder containing the .db3 and metadata.yml file
parser.add_argument('--input','-i',type=str, help='rosbag input location')
# run with python filename.py -i rosbag_dir/

args = parser.parse_args()

rosbag_dir = args.input

topic = "/topic/name"
frame_counter = 0

with ROS2Reader(rosbag_dir) as ros2_reader:

    ros2_conns = [x for x in ros2_reader.connections]
    # This prints a list of all topic names for sanity
    print([x.topic for x in ros2_conns])

    ros2_messages = ros2_reader.messages(connections=ros2_conns)
    
    for m, msg in enumerate(ros2_messages):
        (connection, timestamp, rawdata) = msg
        
        if (connection.topic == topic):
            print(connection.topic) # shows topic
            print(connection.msgtype) # shows message type
            print(type(connection.msgtype)) # shows it's of type string

            # TODO
            # this is where things crash when it's a custom message type
            data = deserialize_cdr(rawdata, connection.msgtype)
            print(data)

The issue is that I can't seem to figure out how to read in custom message types. deserialize_cdr takes a string for the message type field, but it's not clear to me how to replace this with a path or how to otherwise pass in a custom message.

Thanks


Solution

  • One approach would be that you declare and register it to the type system as a string:

    from rosbags.typesys import get_types_from_msg, register_types
    
    MY_CUSTOM_MSG = """
    std_msgs/Header header
    string foo        
    """
    
    register_types(get_types_from_msg(
            MY_CUSTOM_MSG, 'my_custom_msgs/msg/MyCustomMsg'))
    
    from rosbags.typesys.types import my_custom_msgs__msg__MyCustomMsg as MyCustomMsg
    

    Next, using:

    msg_type = MyCustomMsg.__msgtype__
    

    you can get the message type that you can pass to deserialize_cdr. Also, see here for a quick example.

    Another approach is to directly load it from the message definition. Essentially, you would need to read the message

    from pathlib import Path
    custom_msg_path = Path('/path/to/my_custom_msgs/msg/MyCustomMsg.msg')
    msg_def = custom_msg_path.read_text(encoding='utf-8')
    

    and then follow the same steps as above starting with get_types_from_msg(). A more detailed example of this approach is given here.