ROS2 (Robot Operating System 2) is a middleware and toolset for robotics software. It is not an OS — it runs on Linux (or Windows/macOS). It provides:
ros2 topic echo, ros2 node list)In our pipeline: Jetson Orin runs ROS2. It reads SPI data from STM32, decodes protobuf, and publishes sensor_msgs/Imu messages at 100Hz for the navigation stack to consume.
A node is a process that does one thing. The fundamental unit.
Good: one node per sensor, one node for navigation, one node for visualization. Bad: one giant node that does everything.
stm32_bridge_node → reads SPI, publishes /imu/raw at 100Hz
imu_filter_node → subscribes /imu/raw, publishes /imu/filtered
nav_stack_node → subscribes /imu/filtered, /odom, publishes /cmd_vel
motor_driver_node → subscribes /cmd_vel, writes to CAN
A topic is a named channel for streaming data. Publish/subscribe model.
Publisher Subscriber(s)
───────── ──────────────
stm32_bridge_node ──/imu/raw──────────────► imu_filter_node
► imu_logger_node
► rviz2 (visualizer)
Key properties: - Decoupled: publisher doesn’t know who subscribes - Asynchronous: publisher doesn’t wait for subscriber - N-to-M: many publishers, many subscribers on same topic
# Publisher
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu
class ImuPublisher(Node):
def __init__(self):
super().__init__('stm32_bridge')
self.pub = self.create_publisher(Imu, '/imu/raw', 10)
# 10 = queue depth (how many messages to buffer)
def publish_imu(self, data):
msg = Imu()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'imu_link'
msg.linear_acceleration.x = data.accel_x
# ...
self.pub.publish(msg)
# Subscriber
class ImuFilter(Node):
def __init__(self):
super().__init__('imu_filter')
self.sub = self.create_subscription(Imu, '/imu/raw', self.callback, 10)
def callback(self, msg):
# Called in executor thread when message arrives
filtered = self.apply_filter(msg)
# process...
A service is a request-reply pattern. One node asks, another answers — synchronous.
Client: "set_motor_speed(1.5)" ──► Service server: processes, returns OK
"get_battery_status()" ──► Service server: returns 87%
Use for: configuration changes, one-time commands, status queries. Do NOT use for: high-rate sensor data (use topics instead — services are slow).
from std_srvs.srv import SetBool
class MotorNode(Node):
def __init__(self):
super().__init__('motor_driver')
self.srv = self.create_service(SetBool, 'enable_motors', self.handle_enable)
def handle_enable(self, request, response):
enabled = request.data
self.set_motor_power(enabled)
response.success = True
response.message = f"Motors {'on' if enabled else 'off'}"
return response
An action is like a service, but for long-running tasks with progress feedback.
Client: "navigate_to(x=5, y=3)" ──► Server starts moving robot
← "feedback: 30% done"
← "feedback: 60% done"
← "result: arrived" (or "result: failed")
Client can cancel at any time.
Use for: navigation goals, arm movements, multi-second operations.
Nodes can have parameters — configuration values you can change at runtime without restarting.
self.declare_parameter('imu_frame', 'imu_link')
self.declare_parameter('publish_rate_hz', 100.0)
frame = self.get_parameter('imu_frame').get_parameter_value().string_value
rate = self.get_parameter('publish_rate_hz').get_parameter_value().double_value
ros2 param set /stm32_bridge imu_frame base_imu_link
ros2 param get /stm32_bridge publish_rate_hz
ROS2 lets you configure reliability, durability, and deadline of message delivery.
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy
# BEST_EFFORT: don't retry dropped messages — use for sensors (lidar, IMU, cameras)
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
durability=DurabilityPolicy.VOLATILE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
# RELIABLE: retry until delivered — use for commands, events
reliable_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
...
)
Rule for our IMU: use BEST_EFFORT. At 100Hz, a dropped message is 10ms old — it’s better to skip and show the next fresh one than to retry stale data.
self.pub = self.create_publisher(Imu, '/imu/raw', sensor_qos)
This is the real node that reads from SPI and publishes to ROS2:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy
from sensor_msgs.msg import Imu
from geometry_msgs.msg import Vector3
import spidev
import struct
from robot_msgs.pb2 import RobotFrame # generated from .proto
class Stm32BridgeNode(Node):
def __init__(self):
super().__init__('stm32_bridge')
# SPI setup
self.spi = spidev.SpiDev()
self.spi.open(1, 0) # /dev/spidev1.0
self.spi.max_speed_hz = 10_000_000
self.spi.mode = 0
# Publisher with BEST_EFFORT QoS
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
depth=10,
)
self.imu_pub = self.create_publisher(Imu, '/imu/raw', sensor_qos)
# Timer at 100Hz
self.timer = self.create_timer(0.01, self.read_and_publish)
# Stats
self.frame_count = 0
self.error_count = 0
self.get_logger().info('STM32 bridge node started')
def read_and_publish(self):
"""Called every 10ms by timer"""
try:
# Read 150-byte SPI frame
raw = bytes(self.spi.readbytes(150))
# Validate header
if raw[0] != 0xAA or raw[1] != 0x55:
self.error_count += 1
return
# Length field
payload_len = struct.unpack_from('>H', raw, 2)[0]
payload = raw[4:4 + payload_len]
# Decode protobuf
frame = RobotFrame()
frame.ParseFromString(payload)
# Build Imu message
msg = Imu()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'imu_link'
msg.linear_acceleration.x = frame.accel_x
msg.linear_acceleration.y = frame.accel_y
msg.linear_acceleration.z = frame.accel_z
msg.angular_velocity.x = frame.gyro_x
msg.angular_velocity.y = frame.gyro_y
msg.angular_velocity.z = frame.gyro_z
# Covariances unknown (-1 diagonal = unknown)
msg.linear_acceleration_covariance[0] = -1.0
self.imu_pub.publish(msg)
self.frame_count += 1
if self.frame_count % 1000 == 0:
self.get_logger().info(
f'Published {self.frame_count} frames, errors={self.error_count}'
)
except Exception as e:
self.get_logger().error(f'Bridge error: {e}')
self.error_count += 1
def main(args=None):
rclpy.init(args=args)
node = Stm32BridgeNode()
rclpy.spin(node) # runs the executor, calls callbacks
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
my_robot/
├── CMakeLists.txt ← ament_cmake build
├── package.xml ← metadata, dependencies
├── my_robot/
│ ├── __init__.py
│ ├── stm32_bridge_node.py
│ └── imu_filter_node.py
├── launch/
│ └── robot.launch.py
├── config/
│ └── params.yaml
└── msg/
└── WheelSpeed.msg ← custom message type
package.xml (declare dependencies):
<?xml version="1.0"?>
<package format="3">
<name>my_robot</name>
<version>0.1.0</version>
<description>My robot</description>
<license>Apache-2.0</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
A launch file starts multiple nodes at once with configured parameters:
# launch/robot.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(
package='my_robot',
executable='stm32_bridge',
name='stm32_bridge',
parameters=[{'publish_rate_hz': 100.0, 'imu_frame': 'imu_link'}],
),
Node(
package='my_robot',
executable='imu_filter',
name='imu_filter',
),
Node(
package='nav2_amcl',
executable='amcl',
name='amcl',
parameters=['/path/to/amcl_params.yaml'],
),
])
Run: ros2 launch my_robot robot.launch.py
# Build
cd ~/ros2_ws
colcon build --packages-select my_robot
source install/setup.bash
# Run single node
ros2 run my_robot stm32_bridge
# Run launch file
ros2 launch my_robot robot.launch.py
# Debug
ros2 node list # show running nodes
ros2 topic list # show all topics
ros2 topic echo /imu/raw # print messages live
ros2 topic hz /imu/raw # measure publish frequency
ros2 topic bw /imu/raw # measure bandwidth
ros2 param list /stm32_bridge # list parameters
| Mistake | Symptom | Fix |
|---|---|---|
Missing source install/setup.bash |
Package not found | Run after every colcon build |
| QoS mismatch publisher/subscriber | No messages received | Both must use compatible QoS (BEST_EFFORT matches BEST_EFFORT, RELIABLE matches RELIABLE) |
| Heavy work in callback | 100Hz drops to 10Hz | Move heavy work to separate thread, use timer callback only for receiving |
Not calling rclpy.spin() |
Callbacks never fire | spin() runs the event loop |
| Wrong topic name (typo) | Subscriber never receives | ros2 topic list to verify |
| Using Python GIL for parallelism | Callbacks block each other | Use MultiThreadedExecutor for concurrent callbacks |