Chapter 2: ROS 2 Fundamentals
What is ROS 2?
ROS 2 (Robot Operating System 2) is a flexible middleware framework for building robot applications. Unlike ROS 1, which was designed primarily for research, ROS 2 is built from the ground up for production robotics with enterprise-grade features.
Core Capabilities:
- Communication: Pub/Sub messaging, request/response services, long-running actions
- Hardware Abstraction: Unified interfaces for different robots (arms, mobile bases, drones)
- Tools: Visualization, debugging, monitoring, and recording utilities
- Ecosystem: Thousands of maintained libraries and packages
- Standards: Open-source, vendor-neutral development (MIT licensed)
- Integration: Works with Python, C++, C# on multiple operating systems
ROS 2 vs. ROS 1:
| Feature | ROS 1 | ROS 2 |
|---|---|---|
| Middleware | Custom TCP/UDP | Standard DDS |
| Real-time | Limited | Full support |
| Security | None | Encrypted, authenticated |
| Scalability | Single master | Fully distributed |
| Performance | Research-grade | Production-ready |
| Multi-robot | Difficult | Native support |
| Windows Support | No | Yes |
| Python Support | Limited | Full support |
| Maintenance | Ending | Active |
Why ROS 2 Matters:
- Industry Adoption: Used by Boston Dynamics, ABB, Clearpath Robotics, NASA
- Production Readiness: Designed for deployed systems, not just research
- Ecosystem: Hundreds of packages (perception, navigation, manipulation, simulation)
- Future-Proof: Active development with long-term support releases
- Standardization: Interoperability across robot platforms
Architecture Overview
ROS 2 follows a distributed computing model:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Node 1 │ │ Node 2 │ │ Node 3 │
│ (Sensor) │ │ (Controller) │ │ (Actuator) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└────────────────────┼────────────────────┘
DDS Middleware
(Data Distribution Service)
Key Concepts:
Nodes
Executables that perform specific tasks (sensor reading, control, planning).
# Example node structure
import rclpy
from rclpy.node import Node
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
self.timer = self.create_timer(0.5, self.timer_callback)
def timer_callback(self):
msg = String()
msg.data = 'Hello from ROS 2!'
self.publisher_.publish(msg)
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
if __name__ == '__main__':
main()
Topics
Asynchronous communication channels for sensor data and status.
# Subscribing to a topic
def listener_callback(msg):
self.get_logger().info(f'Received: {msg.data}')
self.subscription = self.create_subscription(String, 'topic', listener_callback, 10)
Services
Synchronous request/response communication for commands and queries.
# Service server
def add_two_ints_callback(request, response):
response.sum = request.a + request.b
return response
srv = node.create_service(AddTwoInts, 'add_two_ints', add_two_ints_callback)
# Service client
client = node.create_client(AddTwoInts, 'add_two_ints')
request = AddTwoInts.Request()
request.a = 5
request.b = 3
future = client.call_async(request)
Actions
Long-running tasks with feedback and cancellation.
# Action server (goal-based)
def goal_callback(goal_request):
if goal_request.order < 0:
return GoalResponse.REJECTED
return GoalResponse.ACCEPTED
self.action_server = ActionServer(
node, Fibonacci, 'fibonacci',
goal_callback, cancel_callback, execute_callback
)
Installation & Setup
Choosing the right installation method depends on your system and workflow. Each has different trade-offs for convenience, isolation, and performance.
Ubuntu 22.04 LTS (Recommended)
The recommended approach for development and production use:
# Step 1: Set up package repository
sudo apt update
sudo apt install software-properties-common
sudo add-apt-repository universe
sudo apt update
# Step 2: Add ROS 2 repository key
sudo apt install curl gnupg2 lsb-release ubuntu-keyring
curl -sSL https://repo.ros2.org/ros.key | sudo apt-key add -
# Step 3: Add ROS 2 repository
sudo sh -c 'echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(lsb_release -cs) main" > /etc/apt/sources.list.d/ros2.list'
# Step 4: Install ROS 2 Humble (Long-term support)
sudo apt update
sudo apt install ros-humble-desktop ros-humble-dev
# Step 5: Install build tools
sudo apt install build-essential git python3-colcon-common-extensions
# Step 6: Source setup script (add to ~/.bashrc for persistence)
source /opt/ros/humble/setup.bash
echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc
# Step 7: Verify installation
ros2 --version
Why Humble?
- Latest LTS release with 5 years of support
- Mature ecosystem with thousands of tested packages
- Better Python integration than older versions
- Recommended for new projects
Available Distributions:
- Humble (2022-2027): Latest LTS, recommended
- Iron (2023-2026): Intermediate support
- Jazzy (2024-2029): Latest with cutting-edge features
macOS Setup
macOS support is available but with some limitations:
# Using Homebrew (easiest method)
brew install ros
# Manual installation from DMG
# Download from: https://docs.ros.org/en/humble/Installation/Alternatives.html
macOS Limitations:
- Some packages not available (require extra compilation)
- Performance slightly slower than Linux
- Docker recommended for complex projects
- Not recommended for production use
Windows Setup (WSL2)
Windows support via Windows Subsystem for Linux 2:
# Step 1: Enable WSL2 in Windows
wsl --install -d Ubuntu-22.04
# Step 2: Inside WSL2, follow Ubuntu installation steps above
# Step 3: Set up X11 forwarding for GUI applications
# Install VcXsrv or Xming on Windows
# In WSL2 ~/.bashrc, add:
export DISPLAY=$(cat /etc/resolv.conf | grep nameserver | awk '{print $2}'):0
Windows Considerations:
- WSL2 adds ~10% performance overhead vs native Linux
- GUI applications require X11 server on Windows
- File access between Windows and WSL2 is slower
- Recommended for development, not production
Docker Container (Isolated Environment)
Best for keeping your system clean and ensuring reproducibility:
# Pull official ROS 2 Docker image
docker pull osrf/ros:humble-desktop
# Run container with X11 forwarding
docker run -it \
--env DISPLAY=$DISPLAY \
--volume /tmp/.X11-unix:/tmp/.X11-unix:rw \
osrf/ros:humble-desktop
# Or with volume mounting (persistent files)
docker run -it \
-v ~/ros2_ws:/root/ros2_ws \
osrf/ros:humble-desktop \
bash
Docker Advantages:
- Isolated from system Python and packages
- Easy reproduction across machines
- Simple version switching
- Container-ready for deployment
Docker Disadvantages:
- ~5% performance overhead
- Slightly more complex for hardware access (cameras, arms)
- Larger disk footprint
Development Environment Setup
After installation, set up your development workspace:
# Create workspace structure
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
# Initialize workspace (modern approach)
# ROS 2 workspaces don't need initialization like ROS 1
# Just create src/ directory
# Set up Python virtual environment (optional but recommended)
python3 -m venv venv
source venv/bin/activate
# Install colcon build system
sudo apt install python3-colcon-common-extensions
# Initialize build
colcon build --symlink-install
# Source workspace overlay
source install/local_setup.bash
Verification and Testing
Confirm your installation works correctly:
# Test 1: Check ROS 2 version and setup
ros2 --version
ros2 doctor
# Test 2: Run demo nodes
# Terminal 1: Publisher
ros2 run demo_nodes_py talker
# Terminal 2: Subscriber
ros2 run demo_nodes_py listener
# Test 3: Check middleware
ros2 topic list
ros2 node list
# Expected output:
# /talker
# /listener
Your First ROS 2 Program
Step 1: Create a Package
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python my_robot_pkg
cd my_robot_pkg
Step 2: Create a Node
Create my_robot_pkg/my_robot_pkg/talker.py:
import rclpy
from std_msgs.msg import String
def main(args=None):
rclpy.init(args=args)
node = rclpy.create_node('talker')
publisher = node.create_publisher(String, 'chatter', 10)
msg = String()
msg.data = 'Hello, ROS 2 World!'
publisher.publish(msg)
node.get_logger().info(f'Publishing: {msg.data}')
rclpy.shutdown()
if __name__ == '__main__':
main()
Step 3: Build & Run
# Build the package
cd ~/ros2_ws
colcon build
# Run the node
source install/setup.bash
ros2 run my_robot_pkg talker
Common Tools
ros2 topic
Inspect and interact with topics:
# List all topics
ros2 topic list
# Show topic details
ros2 topic info /chatter
# Echo topic data
ros2 topic echo /chatter
# Publish to a topic
ros2 topic pub /cmd_vel geometry_msgs/Twist "{linear: {x: 0.5}}"
ros2 node
Manage nodes:
# List running nodes
ros2 node list
# Get node info
ros2 node info /talker
ros2 service
Call ROS 2 services:
# List services
ros2 service list
# Call a service
ros2 service call /my_service std_srvs/srv/Trigger "{}"
RViz (Visualization)
Visual debugging and monitoring:
# Launch RViz
rviz2
# Typical setup:
# - Fixed Frame: map
# - Add displays: TF, PointCloud2, MarkerArray
Robot Description (URDF)
Define robot structure using URDF (Unified Robot Description Format):
<?xml version="1.0"?>
<robot name="my_robot">
<!-- Base link -->
<link name="base_link">
<inertial>
<mass value="10.0"/>
<inertia ixx="1.0" ixy="0.0" ixz="0.0" iyy="1.0" iyz="0.0" izz="1.0"/>
</inertial>
</link>
<!-- Wheel link -->
<link name="wheel_left">
<inertial>
<mass value="1.0"/>
<inertia ixx="0.1" ixy="0.0" ixz="0.0" iyy="0.1" iyz="0.0" izz="0.1"/>
</inertial>
</link>
<!-- Joint connecting base to wheel -->
<joint name="wheel_left_joint" type="revolute">
<parent link="base_link"/>
<child link="wheel_left"/>
<axis xyz="0 1 0"/>
<limit lower="0" upper="10" effort="100" velocity="10"/>
</joint>
</robot>
ROS 2 Parameter System
Store and access configuration parameters:
# Node with parameters
node.declare_parameter('speed', 1.0)
speed = node.get_parameter('speed').value
# Update parameter
node.set_parameters([Parameter('speed', 2.0)])
Debugging & Troubleshooting
Common Issues
Issue: "Could not find ROS installation"
# Solution: Source setup script
source /opt/ros/humble/setup.bash
Issue: "Node not finding topics"
# Solution: Check ROS domain (different IDs don't communicate)
export ROS_DOMAIN_ID=0
ros2 topic list
Issue: "Package not found"
# Solution: Build and source
colcon build
source install/setup.bash
Best Practices
Node Design and Architecture
Single Responsibility Principle: Each node should have one clear purpose, making it easier to test, debug, and reuse.
# GOOD: Separate concerns
class CameraNode(Node):
"""Reads camera and publishes raw images"""
def __init__(self):
super().__init__('camera_driver')
self.camera_pub = self.create_publisher(Image, 'camera/image_raw', 10)
class ImageProcessorNode(Node):
"""Processes images and publishes results"""
def __init__(self):
super().__init__('image_processor')
self.image_sub = self.create_subscription(Image, 'camera/image_raw', self.image_callback, 10)
self.processed_pub = self.create_publisher(Image, 'camera/image_processed', 10)
# BAD: Mixed concerns in one node
class MonolithicNode(Node):
"""Does everything - camera, processing, planning, control"""
# Hard to test, debug, and reuse
Naming Conventions:
- Node names: lowercase with underscores (
my_robot_controller) - Topic names: descriptive paths (
/robot/arm/joint_states) - Service names: action verb (
/robot/arm/move_to_position) - Parameter names: hierarchical (
robot.arm.max_velocity)
Shutdown Handling:
def main():
rclpy.init()
node = MyNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down gracefully...')
finally:
node.destroy_node()
rclpy.shutdown()
Communication Patterns
Choose the Right Pattern:
-
Topics: High-frequency sensor data (images, sensor readings)
- Pub: Camera node publishes ~30 Hz images
- Sub: Multiple nodes process simultaneously
- Best for: Asynchronous, continuous data streams
-
Services: Request/response patterns
- Call: "Move arm to position" → wait for response
- Best for: Discrete commands, queries, configuration
-
Actions: Long-running tasks with feedback
- Goal: "Grasp object" → progress updates → success/failure
- Best for: Multi-step tasks with cancellation
# GOOD: Appropriate use of patterns
class RobotController(Node):
def __init__(self):
super().__init__('robot_controller')
# Topic: high-frequency sensor data
self.joint_state_sub = self.create_subscription(
JointState, '/robot/joint_states', self.joint_callback, 10)
# Service: discrete commands
self.move_service = self.create_service(
MoveToPosition, '/robot/move', self.move_callback)
# Action: long-running tasks
self.grasp_action = ActionServer(
self, Grasp, '/robot/grasp', self.grasp_callback)
Performance Optimization
Message Frequency Tuning:
# GOOD: Appropriate frequencies for different sensor types
class SensorNode(Node):
def __init__(self):
super().__init__('sensors')
# High frequency (100 Hz): time-critical IMU data
self.imu_timer = self.create_timer(0.01, self.publish_imu)
# Medium frequency (30 Hz): camera images
self.camera_timer = self.create_timer(0.033, self.publish_camera)
# Low frequency (1 Hz): status updates
self.status_timer = self.create_timer(1.0, self.publish_status)
Profiling and Monitoring:
# Record data for offline analysis
ros2 bag record /robot/joint_states /robot/cmd_vel
# Analyze topics and message rates
ros2 topic hz /robot/joint_states
ros2 topic bw /robot/joint_states
# Profile CPU and memory usage
htop # Or use ros2_profiling tools
Safety and Reliability
Critical Operations:
class SafeRobotController(Node):
def __init__(self):
super().__init__('safe_controller')
def move_arm_safely(self, target_position):
"""Move arm with safety checks"""
# Check arm availability
if not self.arm_available():
self.get_logger().error('Arm not available!')
return False
# Set timeout for critical operation
timeout_s = 10.0
try:
future = self.move_service_client.call_async(target_position)
done = rclpy.spin_until_future_complete(
self, future, timeout_sec=timeout_s)
if not done:
self.get_logger().error(f'Move timeout after {timeout_s}s')
return False
result = future.result()
if not result.success:
self.get_logger().error('Move failed')
return False
return True
except Exception as e:
self.get_logger().error(f'Move exception: {e}')
return False
Error Handling Patterns:
# Check service availability before calling
if not self.move_client.wait_for_service(timeout_sec=5.0):
self.get_logger().error('Service not available')
return
# Implement retry logic for transient failures
max_retries = 3
for attempt in range(max_retries):
try:
result = call_service()
if result.success:
break
except Exception as e:
if attempt == max_retries - 1:
raise
else:
self.get_logger().warn(f'Retry {attempt+1}/{max_retries}')
Testing and Debugging
Unit Testing:
import unittest
from rclpy.node import Node
class TestMyNode(unittest.TestCase):
def setUp(self):
rclpy.init()
self.node = MyNode()
def test_message_processing(self):
"""Test that node processes messages correctly"""
test_msg = JointState()
test_msg.position = [0.0, 0.0, 0.0]
self.node.joint_callback(test_msg)
# Verify expected behavior
self.assertTrue(self.node.got_message)
def tearDown(self):
self.node.destroy_node()
rclpy.shutdown()
Logging Best Practices:
class WellLoggedNode(Node):
def __init__(self):
super().__init__('well_logged')
def do_something_important(self):
self.get_logger().debug('Starting operation')
try:
result = self.operation()
self.get_logger().info(f'Operation completed: {result}')
except Exception as e:
self.get_logger().error(f'Operation failed: {e}', exc_info=True)
Real-World Integration Patterns
Multi-node Coordination: When multiple nodes need to work together, use a coordinator node:
class RobotCoordinator(Node):
"""Orchestrates multi-node robot system"""
def __init__(self):
super().__init__('robot_coordinator')
self.states = {}
# Subscribe to status from all subsystems
self.arm_state_sub = self.create_subscription(
ArmState, '/arm/state', self.arm_state_callback, 10)
self.gripper_state_sub = self.create_subscription(
GripperState, '/gripper/state', self.gripper_state_callback, 10)
# Implement state machine
self.timer = self.create_timer(0.1, self.coordinate)
def coordinate(self):
"""Main coordination loop"""
if all(self.is_ready(subsys) for subsys in ['arm', 'gripper']):
self.execute_task()
Remapping and Configuration: Configure topics and parameters without code changes:
# Remap topics at launch time
ros2 run my_pkg my_node --ros-args \
-r '/my_node/input:=/camera/image_raw' \
-r '/my_node/output:=/processed/image'
# Set parameters at launch time
ros2 run my_pkg my_node --ros-args \
-p 'robot.max_velocity:=2.0' \
-p 'robot.acceleration:=0.5'
Next Steps
- Next Chapter: Gazebo Simulation - test your ROS 2 code in a virtual environment
- Practice: Build a simple publisher/subscriber system
- Explore: Try
ros2 demo_nodes_pyfor example nodes
Chapter 2 Summary:
- ROS 2 is middleware for distributed robot systems
- Nodes communicate via Topics (async), Services (sync), and Actions (long-running)
- Standard tools:
ros2 topic,ros2 node,rviz2 - URDF describes robot structure and kinematics
- DDS ensures scalability and real-time performance
Estimated Reading Time: 25 minutes Code Examples: 8 (publisher, subscriber, service, action) Hands-on Exercise: Create and run your first ROS 2 node Next Chapter: Gazebo Simulation