01-nodes-topics-actions.mdEstimated time: 60 minutes
Prerequisite: 01-nodes-topics-actions.md
Self-assessment guide: Write your answer on paper before expanding each details block. If you can answer Section A without peeking, you understand the executor model well enough to diagnose silent subscriber failures in field logs.
A large share of “node running but not publishing” incidents come from exactly three root
causes: the executor is blocked by a long callback, a QoS incompatibility prevents
message delivery, or the publisher/subscriber was created but never has spin() called.
These exercises drill each scenario from first principles, then ask you to build complete
nodes from scratch.
A1. What does rclpy.spin(node) block on?
Describe exactly what the thread is doing while it waits for the next event, and what
happens to the spin call when your callback takes 5 seconds to execute.
A2. A publisher uses KEEP_LAST(1) on a 100 Hz topic. Describe three distinct
scenarios where this QoS choice is correct and one where it would be the wrong choice.
A3. What happens if a subscriber callback takes longer than the publish rate of its topic? Walk through what happens to message delivery over 3 publish periods.
A4. What is the difference between rclpy.Node and rclpy_lifecycle.LifecycleNode?
Name at least three states of a lifecycle node and explain what operation is typically
performed in each.
A5. Name the three things create_publisher requires. Then describe what happens at
the DDS layer when two nodes publish to the same topic but with incompatible QoS.
Each snippet below has exactly one bug. Read the code, identify the bug, and write the fix before expanding the answer.
B1. This node is supposed to publish /status at 1Hz. It never publishes anything.
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class StatusNode(Node):
def __init__(self):
super().__init__('status_node')
self.pub = self.create_publisher(String, '/status', 10)
self.timer = self.create_timer(1.0, self.timer_cb)
def timer_cb(self):
msg = String()
msg.data = 'ok'
self.pub.publish(msg)
def main():
rclpy.init()
node = StatusNode()
# nothing else
def main():
rclpy.init()
node = StatusNode()
rclpy.spin(node) # <-- this line is missing
node.destroy_node()
rclpy.shutdown()
B2. This subscriber is intended to cache the latest message. Under concurrent access the cached value is sometimes corrupted.
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu
from rclpy.executors import MultiThreadedExecutor
import threading
class ImuCache(Node):
def __init__(self):
super().__init__('imu_cache')
self.latest_imu = None
self.sub = self.create_subscription(Imu, '/imu', self.cb, 10)
def cb(self, msg):
self.latest_imu = msg # shared write, no lock
def get_latest(self):
return self.latest_imu # shared read, no lock
def main():
rclpy.init()
node = ImuCache()
executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(node)
executor.spin()
import threading
class ImuCache(Node):
def __init__(self):
super().__init__('imu_cache')
self.latest_imu = None
self._lock = threading.Lock() # <-- add lock
self.sub = self.create_subscription(Imu, '/imu', self.cb, 10)
def cb(self, msg):
with self._lock: # <-- acquire before write
self.latest_imu = msg
def get_latest(self):
with self._lock: # <-- acquire before read
return self.latest_imu
B3. This QoS config is intended to receive /map from map_server, which publishes
with TRANSIENT_LOCAL durability. The subscriber never receives the map on startup.
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy
map_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.VOLATILE, # <-- look here
depth=1,
)
self.map_sub = self.create_subscription(
OccupancyGrid, '/map', self.map_cb, map_qos
)
map_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.TRANSIENT_LOCAL, # <-- must match publisher
depth=1,
)
Write a complete, runnable Python ROS2 node that publishes a Float64 to /robot/speed
at 10 Hz. Include all imports, the Node subclass, main(), and the spin() call.
The published value should be 1.0 (a constant placeholder).
# TODO: fill in here
import rclpy
# TODO: remaining imports
class SpeedPublisher(Node):
def __init__(self):
super().__init__('speed_publisher')
# TODO: create publisher on /robot/speed, Float64, depth 10
# TODO: create timer at 10Hz calling self.timer_cb
def timer_cb(self):
# TODO: create Float64 message, set data=1.0, publish it
pass
def main(args=None):
# TODO: init, create node, spin, destroy, shutdown
pass
if __name__ == '__main__':
main()
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64
class SpeedPublisher(Node):
def __init__(self):
super().__init__('speed_publisher')
self.pub = self.create_publisher(Float64, '/robot/speed', 10)
self.timer = self.create_timer(0.1, self.timer_cb) # 1/10Hz = 0.1s
def timer_cb(self):
msg = Float64()
msg.data = 1.0
self.pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = SpeedPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Write a node that:
1. Subscribes to /odom (nav_msgs/Odometry) and stores the latest linear x-speed.
2. Subscribes to /imu (sensor_msgs/Imu) and stores the latest linear x-acceleration.
3. On a 1 Hz timer, publishes a Float64 to /fusion/speed_comparison equal to the
absolute difference between the latest odom speed and the latest IMU linear x-acceleration.
4. If no message has been received yet on either topic, skip the publish and log a warning.
import rclpy
from rclpy.node import Node
# TODO: remaining imports
class FusionNode(Node):
def __init__(self):
super().__init__('fusion_node')
self._latest_odom_speed = None # float or None
self._latest_imu_accel = None # float or None
# TODO: subscriber to /odom
# TODO: subscriber to /imu
# TODO: publisher on /fusion/speed_comparison
# TODO: 1Hz timer
def odom_cb(self, msg):
# TODO: store msg.twist.twist.linear.x
pass
def imu_cb(self, msg):
# TODO: store msg.linear_acceleration.x
pass
def timer_cb(self):
# TODO: guard against None, compute abs diff, publish
pass
def main(args=None):
rclpy.init(args=args)
node = FusionNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Imu
from std_msgs.msg import Float64
class FusionNode(Node):
def __init__(self):
super().__init__('fusion_node')
self._latest_odom_speed = None
self._latest_imu_accel = None
self.sub_odom = self.create_subscription(
Odometry, '/odom', self.odom_cb, 10)
self.sub_imu = self.create_subscription(
Imu, '/imu', self.imu_cb, 10)
self.pub = self.create_publisher(Float64, '/fusion/speed_comparison', 10)
self.timer = self.create_timer(1.0, self.timer_cb)
def odom_cb(self, msg: Odometry):
self._latest_odom_speed = msg.twist.twist.linear.x
def imu_cb(self, msg: Imu):
self._latest_imu_accel = msg.linear_acceleration.x
def timer_cb(self):
if self._latest_odom_speed is None or self._latest_imu_accel is None:
self.get_logger().warn(
'Waiting for sensor data: '
f'odom={self._latest_odom_speed is not None}, '
f'imu={self._latest_imu_accel is not None}'
)
return
diff = Float64()
diff.data = abs(self._latest_odom_speed - self._latest_imu_accel)
self.pub.publish(diff)
def main(args=None):
rclpy.init(args=args)
node = FusionNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
Convert the publisher from C1 into a Lifecycle Node. The publisher should only
be created in on_configure() and should only be actively used in the ACTIVE state.
Show on_activate() and on_deactivate() callbacks.
import rclpy
from rclpy.lifecycle import LifecycleNode, TransitionCallbackReturn, State
# TODO: remaining imports
class LifecycleSpeedPublisher(LifecycleNode):
def __init__(self):
super().__init__('lifecycle_speed_publisher')
self._pub = None
self._timer = None
def on_configure(self, state: State) -> TransitionCallbackReturn:
"""Called when transitioning UNCONFIGURED → INACTIVE."""
# TODO: create publisher (but do NOT start timer yet)
self.get_logger().info('Configured.')
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: State) -> TransitionCallbackReturn:
"""Called when transitioning INACTIVE → ACTIVE."""
# TODO: create the 10Hz timer
self.get_logger().info('Activated.')
return TransitionCallbackReturn.SUCCESS
def on_deactivate(self, state: State) -> TransitionCallbackReturn:
"""Called when transitioning ACTIVE → INACTIVE."""
# TODO: destroy/cancel the timer (publisher stays allocated)
self.get_logger().info('Deactivated.')
return TransitionCallbackReturn.SUCCESS
def on_cleanup(self, state: State) -> TransitionCallbackReturn:
"""Called when transitioning INACTIVE → UNCONFIGURED."""
# TODO: destroy publisher
self.get_logger().info('Cleaned up.')
return TransitionCallbackReturn.SUCCESS
def timer_cb(self):
# TODO: publish Float64(data=1.0) to /robot/speed
pass
def main(args=None):
rclpy.init(args=args)
node = LifecycleSpeedPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
import rclpy
from rclpy.lifecycle import LifecycleNode, TransitionCallbackReturn, State
from std_msgs.msg import Float64
class LifecycleSpeedPublisher(LifecycleNode):
def __init__(self):
super().__init__('lifecycle_speed_publisher')
self._pub = None
self._timer = None
def on_configure(self, state: State) -> TransitionCallbackReturn:
self._pub = self.create_lifecycle_publisher(Float64, '/robot/speed', 10)
self.get_logger().info('Configured: publisher created (inactive).')
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: State) -> TransitionCallbackReturn:
super().on_activate(state) # activates the managed publisher
self._timer = self.create_timer(0.1, self.timer_cb)
self.get_logger().info('Activated: timer started.')
return TransitionCallbackReturn.SUCCESS
def on_deactivate(self, state: State) -> TransitionCallbackReturn:
if self._timer is not None:
self._timer.cancel()
self.destroy_timer(self._timer)
self._timer = None
super().on_deactivate(state) # deactivates the managed publisher
self.get_logger().info('Deactivated: timer stopped.')
return TransitionCallbackReturn.SUCCESS
def on_cleanup(self, state: State) -> TransitionCallbackReturn:
if self._pub is not None:
self.destroy_publisher(self._pub)
self._pub = None
self.get_logger().info('Cleaned up: publisher destroyed.')
return TransitionCallbackReturn.SUCCESS
def timer_cb(self):
if self._pub is not None and self._pub.is_activated:
msg = Float64()
msg.data = 1.0
self._pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = LifecycleSpeedPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
ros2 lifecycle set /lifecycle_speed_publisher configure
ros2 lifecycle set /lifecycle_speed_publisher activate
ros2 lifecycle set /lifecycle_speed_publisher deactivate