Skip to main content

Chapter 2: ROS 2 Fundamentals

What is ROS 2?

ROS 2 (Robot Operating System 2) is a flexible middleware framework for building robot applications. Unlike ROS 1, which was designed primarily for research, ROS 2 is built from the ground up for production robotics with enterprise-grade features.

Core Capabilities:

  • Communication: Pub/Sub messaging, request/response services, long-running actions
  • Hardware Abstraction: Unified interfaces for different robots (arms, mobile bases, drones)
  • Tools: Visualization, debugging, monitoring, and recording utilities
  • Ecosystem: Thousands of maintained libraries and packages
  • Standards: Open-source, vendor-neutral development (MIT licensed)
  • Integration: Works with Python, C++, C# on multiple operating systems

ROS 2 vs. ROS 1:

FeatureROS 1ROS 2
MiddlewareCustom TCP/UDPStandard DDS
Real-timeLimitedFull support
SecurityNoneEncrypted, authenticated
ScalabilitySingle masterFully distributed
PerformanceResearch-gradeProduction-ready
Multi-robotDifficultNative support
Windows SupportNoYes
Python SupportLimitedFull support
MaintenanceEndingActive

Why ROS 2 Matters:

  • Industry Adoption: Used by Boston Dynamics, ABB, Clearpath Robotics, NASA
  • Production Readiness: Designed for deployed systems, not just research
  • Ecosystem: Hundreds of packages (perception, navigation, manipulation, simulation)
  • Future-Proof: Active development with long-term support releases
  • Standardization: Interoperability across robot platforms

Architecture Overview

ROS 2 follows a distributed computing model:

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│ Node 1 │ │ Node 2 │ │ Node 3 │
│ (Sensor) │ │ (Controller) │ │ (Actuator) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└────────────────────┼────────────────────┘
DDS Middleware
(Data Distribution Service)

Key Concepts:

Nodes

Executables that perform specific tasks (sensor reading, control, planning).

# Example node structure
import rclpy
from rclpy.node import Node

class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
self.timer = self.create_timer(0.5, self.timer_callback)

def timer_callback(self):
msg = String()
msg.data = 'Hello from ROS 2!'
self.publisher_.publish(msg)

def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)

if __name__ == '__main__':
main()

Topics

Asynchronous communication channels for sensor data and status.

# Subscribing to a topic
def listener_callback(msg):
self.get_logger().info(f'Received: {msg.data}')

self.subscription = self.create_subscription(String, 'topic', listener_callback, 10)

Services

Synchronous request/response communication for commands and queries.

# Service server
def add_two_ints_callback(request, response):
response.sum = request.a + request.b
return response

srv = node.create_service(AddTwoInts, 'add_two_ints', add_two_ints_callback)

# Service client
client = node.create_client(AddTwoInts, 'add_two_ints')
request = AddTwoInts.Request()
request.a = 5
request.b = 3
future = client.call_async(request)

Actions

Long-running tasks with feedback and cancellation.

# Action server (goal-based)
def goal_callback(goal_request):
if goal_request.order < 0:
return GoalResponse.REJECTED
return GoalResponse.ACCEPTED

self.action_server = ActionServer(
node, Fibonacci, 'fibonacci',
goal_callback, cancel_callback, execute_callback
)

Installation & Setup

Choosing the right installation method depends on your system and workflow. Each has different trade-offs for convenience, isolation, and performance.

The recommended approach for development and production use:

# Step 1: Set up package repository
sudo apt update
sudo apt install software-properties-common
sudo add-apt-repository universe
sudo apt update

# Step 2: Add ROS 2 repository key
sudo apt install curl gnupg2 lsb-release ubuntu-keyring
curl -sSL https://repo.ros2.org/ros.key | sudo apt-key add -

# Step 3: Add ROS 2 repository
sudo sh -c 'echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(lsb_release -cs) main" > /etc/apt/sources.list.d/ros2.list'

# Step 4: Install ROS 2 Humble (Long-term support)
sudo apt update
sudo apt install ros-humble-desktop ros-humble-dev

# Step 5: Install build tools
sudo apt install build-essential git python3-colcon-common-extensions

# Step 6: Source setup script (add to ~/.bashrc for persistence)
source /opt/ros/humble/setup.bash
echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc

# Step 7: Verify installation
ros2 --version

Why Humble?

  • Latest LTS release with 5 years of support
  • Mature ecosystem with thousands of tested packages
  • Better Python integration than older versions
  • Recommended for new projects

Available Distributions:

  • Humble (2022-2027): Latest LTS, recommended
  • Iron (2023-2026): Intermediate support
  • Jazzy (2024-2029): Latest with cutting-edge features

macOS Setup

macOS support is available but with some limitations:

# Using Homebrew (easiest method)
brew install ros

# Manual installation from DMG
# Download from: https://docs.ros.org/en/humble/Installation/Alternatives.html

macOS Limitations:

  • Some packages not available (require extra compilation)
  • Performance slightly slower than Linux
  • Docker recommended for complex projects
  • Not recommended for production use

Windows Setup (WSL2)

Windows support via Windows Subsystem for Linux 2:

# Step 1: Enable WSL2 in Windows
wsl --install -d Ubuntu-22.04

# Step 2: Inside WSL2, follow Ubuntu installation steps above

# Step 3: Set up X11 forwarding for GUI applications
# Install VcXsrv or Xming on Windows
# In WSL2 ~/.bashrc, add:
export DISPLAY=$(cat /etc/resolv.conf | grep nameserver | awk '{print $2}'):0

Windows Considerations:

  • WSL2 adds ~10% performance overhead vs native Linux
  • GUI applications require X11 server on Windows
  • File access between Windows and WSL2 is slower
  • Recommended for development, not production

Docker Container (Isolated Environment)

Best for keeping your system clean and ensuring reproducibility:

# Pull official ROS 2 Docker image
docker pull osrf/ros:humble-desktop

# Run container with X11 forwarding
docker run -it \
--env DISPLAY=$DISPLAY \
--volume /tmp/.X11-unix:/tmp/.X11-unix:rw \
osrf/ros:humble-desktop

# Or with volume mounting (persistent files)
docker run -it \
-v ~/ros2_ws:/root/ros2_ws \
osrf/ros:humble-desktop \
bash

Docker Advantages:

  • Isolated from system Python and packages
  • Easy reproduction across machines
  • Simple version switching
  • Container-ready for deployment

Docker Disadvantages:

  • ~5% performance overhead
  • Slightly more complex for hardware access (cameras, arms)
  • Larger disk footprint

Development Environment Setup

After installation, set up your development workspace:

# Create workspace structure
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws

# Initialize workspace (modern approach)
# ROS 2 workspaces don't need initialization like ROS 1
# Just create src/ directory

# Set up Python virtual environment (optional but recommended)
python3 -m venv venv
source venv/bin/activate

# Install colcon build system
sudo apt install python3-colcon-common-extensions

# Initialize build
colcon build --symlink-install

# Source workspace overlay
source install/local_setup.bash

Verification and Testing

Confirm your installation works correctly:

# Test 1: Check ROS 2 version and setup
ros2 --version
ros2 doctor

# Test 2: Run demo nodes
# Terminal 1: Publisher
ros2 run demo_nodes_py talker

# Terminal 2: Subscriber
ros2 run demo_nodes_py listener

# Test 3: Check middleware
ros2 topic list
ros2 node list

# Expected output:
# /talker
# /listener

Your First ROS 2 Program

Step 1: Create a Package

mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python my_robot_pkg
cd my_robot_pkg

Step 2: Create a Node

Create my_robot_pkg/my_robot_pkg/talker.py:

import rclpy
from std_msgs.msg import String

def main(args=None):
rclpy.init(args=args)
node = rclpy.create_node('talker')
publisher = node.create_publisher(String, 'chatter', 10)

msg = String()
msg.data = 'Hello, ROS 2 World!'
publisher.publish(msg)

node.get_logger().info(f'Publishing: {msg.data}')
rclpy.shutdown()

if __name__ == '__main__':
main()

Step 3: Build & Run

# Build the package
cd ~/ros2_ws
colcon build

# Run the node
source install/setup.bash
ros2 run my_robot_pkg talker

Common Tools

ros2 topic

Inspect and interact with topics:

# List all topics
ros2 topic list

# Show topic details
ros2 topic info /chatter

# Echo topic data
ros2 topic echo /chatter

# Publish to a topic
ros2 topic pub /cmd_vel geometry_msgs/Twist "{linear: {x: 0.5}}"

ros2 node

Manage nodes:

# List running nodes
ros2 node list

# Get node info
ros2 node info /talker

ros2 service

Call ROS 2 services:

# List services
ros2 service list

# Call a service
ros2 service call /my_service std_srvs/srv/Trigger "{}"

RViz (Visualization)

Visual debugging and monitoring:

# Launch RViz
rviz2

# Typical setup:
# - Fixed Frame: map
# - Add displays: TF, PointCloud2, MarkerArray

Robot Description (URDF)

Define robot structure using URDF (Unified Robot Description Format):

<?xml version="1.0"?>
<robot name="my_robot">
<!-- Base link -->
<link name="base_link">
<inertial>
<mass value="10.0"/>
<inertia ixx="1.0" ixy="0.0" ixz="0.0" iyy="1.0" iyz="0.0" izz="1.0"/>
</inertial>
</link>

<!-- Wheel link -->
<link name="wheel_left">
<inertial>
<mass value="1.0"/>
<inertia ixx="0.1" ixy="0.0" ixz="0.0" iyy="0.1" iyz="0.0" izz="0.1"/>
</inertial>
</link>

<!-- Joint connecting base to wheel -->
<joint name="wheel_left_joint" type="revolute">
<parent link="base_link"/>
<child link="wheel_left"/>
<axis xyz="0 1 0"/>
<limit lower="0" upper="10" effort="100" velocity="10"/>
</joint>
</robot>

ROS 2 Parameter System

Store and access configuration parameters:

# Node with parameters
node.declare_parameter('speed', 1.0)
speed = node.get_parameter('speed').value

# Update parameter
node.set_parameters([Parameter('speed', 2.0)])

Debugging & Troubleshooting

Common Issues

Issue: "Could not find ROS installation"

# Solution: Source setup script
source /opt/ros/humble/setup.bash

Issue: "Node not finding topics"

# Solution: Check ROS domain (different IDs don't communicate)
export ROS_DOMAIN_ID=0
ros2 topic list

Issue: "Package not found"

# Solution: Build and source
colcon build
source install/setup.bash

Best Practices

Node Design and Architecture

Single Responsibility Principle: Each node should have one clear purpose, making it easier to test, debug, and reuse.

# GOOD: Separate concerns
class CameraNode(Node):
"""Reads camera and publishes raw images"""
def __init__(self):
super().__init__('camera_driver')
self.camera_pub = self.create_publisher(Image, 'camera/image_raw', 10)

class ImageProcessorNode(Node):
"""Processes images and publishes results"""
def __init__(self):
super().__init__('image_processor')
self.image_sub = self.create_subscription(Image, 'camera/image_raw', self.image_callback, 10)
self.processed_pub = self.create_publisher(Image, 'camera/image_processed', 10)

# BAD: Mixed concerns in one node
class MonolithicNode(Node):
"""Does everything - camera, processing, planning, control"""
# Hard to test, debug, and reuse

Naming Conventions:

  • Node names: lowercase with underscores (my_robot_controller)
  • Topic names: descriptive paths (/robot/arm/joint_states)
  • Service names: action verb (/robot/arm/move_to_position)
  • Parameter names: hierarchical (robot.arm.max_velocity)

Shutdown Handling:

def main():
rclpy.init()
node = MyNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down gracefully...')
finally:
node.destroy_node()
rclpy.shutdown()

Communication Patterns

Choose the Right Pattern:

  • Topics: High-frequency sensor data (images, sensor readings)

    • Pub: Camera node publishes ~30 Hz images
    • Sub: Multiple nodes process simultaneously
    • Best for: Asynchronous, continuous data streams
  • Services: Request/response patterns

    • Call: "Move arm to position" → wait for response
    • Best for: Discrete commands, queries, configuration
  • Actions: Long-running tasks with feedback

    • Goal: "Grasp object" → progress updates → success/failure
    • Best for: Multi-step tasks with cancellation
# GOOD: Appropriate use of patterns
class RobotController(Node):
def __init__(self):
super().__init__('robot_controller')

# Topic: high-frequency sensor data
self.joint_state_sub = self.create_subscription(
JointState, '/robot/joint_states', self.joint_callback, 10)

# Service: discrete commands
self.move_service = self.create_service(
MoveToPosition, '/robot/move', self.move_callback)

# Action: long-running tasks
self.grasp_action = ActionServer(
self, Grasp, '/robot/grasp', self.grasp_callback)

Performance Optimization

Message Frequency Tuning:

# GOOD: Appropriate frequencies for different sensor types
class SensorNode(Node):
def __init__(self):
super().__init__('sensors')

# High frequency (100 Hz): time-critical IMU data
self.imu_timer = self.create_timer(0.01, self.publish_imu)

# Medium frequency (30 Hz): camera images
self.camera_timer = self.create_timer(0.033, self.publish_camera)

# Low frequency (1 Hz): status updates
self.status_timer = self.create_timer(1.0, self.publish_status)

Profiling and Monitoring:

# Record data for offline analysis
ros2 bag record /robot/joint_states /robot/cmd_vel

# Analyze topics and message rates
ros2 topic hz /robot/joint_states
ros2 topic bw /robot/joint_states

# Profile CPU and memory usage
htop # Or use ros2_profiling tools

Safety and Reliability

Critical Operations:

class SafeRobotController(Node):
def __init__(self):
super().__init__('safe_controller')

def move_arm_safely(self, target_position):
"""Move arm with safety checks"""

# Check arm availability
if not self.arm_available():
self.get_logger().error('Arm not available!')
return False

# Set timeout for critical operation
timeout_s = 10.0
try:
future = self.move_service_client.call_async(target_position)
done = rclpy.spin_until_future_complete(
self, future, timeout_sec=timeout_s)

if not done:
self.get_logger().error(f'Move timeout after {timeout_s}s')
return False

result = future.result()
if not result.success:
self.get_logger().error('Move failed')
return False

return True

except Exception as e:
self.get_logger().error(f'Move exception: {e}')
return False

Error Handling Patterns:

# Check service availability before calling
if not self.move_client.wait_for_service(timeout_sec=5.0):
self.get_logger().error('Service not available')
return

# Implement retry logic for transient failures
max_retries = 3
for attempt in range(max_retries):
try:
result = call_service()
if result.success:
break
except Exception as e:
if attempt == max_retries - 1:
raise
else:
self.get_logger().warn(f'Retry {attempt+1}/{max_retries}')

Testing and Debugging

Unit Testing:

import unittest
from rclpy.node import Node

class TestMyNode(unittest.TestCase):
def setUp(self):
rclpy.init()
self.node = MyNode()

def test_message_processing(self):
"""Test that node processes messages correctly"""
test_msg = JointState()
test_msg.position = [0.0, 0.0, 0.0]

self.node.joint_callback(test_msg)

# Verify expected behavior
self.assertTrue(self.node.got_message)

def tearDown(self):
self.node.destroy_node()
rclpy.shutdown()

Logging Best Practices:

class WellLoggedNode(Node):
def __init__(self):
super().__init__('well_logged')

def do_something_important(self):
self.get_logger().debug('Starting operation')

try:
result = self.operation()
self.get_logger().info(f'Operation completed: {result}')
except Exception as e:
self.get_logger().error(f'Operation failed: {e}', exc_info=True)

Real-World Integration Patterns

Multi-node Coordination: When multiple nodes need to work together, use a coordinator node:

class RobotCoordinator(Node):
"""Orchestrates multi-node robot system"""

def __init__(self):
super().__init__('robot_coordinator')
self.states = {}

# Subscribe to status from all subsystems
self.arm_state_sub = self.create_subscription(
ArmState, '/arm/state', self.arm_state_callback, 10)
self.gripper_state_sub = self.create_subscription(
GripperState, '/gripper/state', self.gripper_state_callback, 10)

# Implement state machine
self.timer = self.create_timer(0.1, self.coordinate)

def coordinate(self):
"""Main coordination loop"""
if all(self.is_ready(subsys) for subsys in ['arm', 'gripper']):
self.execute_task()

Remapping and Configuration: Configure topics and parameters without code changes:

# Remap topics at launch time
ros2 run my_pkg my_node --ros-args \
-r '/my_node/input:=/camera/image_raw' \
-r '/my_node/output:=/processed/image'

# Set parameters at launch time
ros2 run my_pkg my_node --ros-args \
-p 'robot.max_velocity:=2.0' \
-p 'robot.acceleration:=0.5'

Next Steps

  • Next Chapter: Gazebo Simulation - test your ROS 2 code in a virtual environment
  • Practice: Build a simple publisher/subscriber system
  • Explore: Try ros2 demo_nodes_py for example nodes

Chapter 2 Summary:

  • ROS 2 is middleware for distributed robot systems
  • Nodes communicate via Topics (async), Services (sync), and Actions (long-running)
  • Standard tools: ros2 topic, ros2 node, rviz2
  • URDF describes robot structure and kinematics
  • DDS ensures scalability and real-time performance

Estimated Reading Time: 25 minutes Code Examples: 8 (publisher, subscriber, service, action) Hands-on Exercise: Create and run your first ROS 2 node Next Chapter: Gazebo Simulation