Skip to main content
This guide explains how to extend the quadruped’s gait controllers with custom parameters, gaits, and adaptation strategies.

Controller Architecture

The project uses a hierarchical controller architecture:
┌─────────────────────────────────────┐
│  RL Policy (Neural Network)         │
│  - Outputs: param deltas + residuals│
└──────────────┬──────────────────────┘


┌─────────────────────────────────────┐
│  AdaptiveGaitController              │
│  - Applies parameter updates         │
│  - Adds residual corrections         │
└──────────────┬──────────────────────┘


┌─────────────────────────────────────┐
│  DiagonalGaitController              │
│  - Generates nominal trajectories    │
│  - Bezier swing curves               │
│  - Linear stance paths               │
└──────────────┬──────────────────────┘


┌─────────────────────────────────────┐
│  IK Solver (ik.py)                   │
│  - Converts foot targets to joints   │
└──────────────┬──────────────────────┘


         MuJoCo Simulation

Adding New Gait Parameters

Step 1: Extend GaitParameters

File: ~/workspace/source/gait_controller.py:18 Add new parameters to the GaitParameters dataclass:
@dataclass
class GaitParameters:
    """Configuration bundle for the trot gait."""
    
    body_height: float = 0.07
    step_length: float = 0.05
    step_height: float = 0.015
    cycle_time: float = 0.8
    swing_shape: float = 0.35
    
    # NEW: Stance width (lateral foot spacing)
    stance_width: float = 0.08  # meters
    
    lateral_offsets: Dict[str, float] = field(
        default_factory=lambda: {
            "FL": -0.0,
            "FR": 0.0,
            "RL": -0.0,
            "RR": 0.0,
        }
    )
Usage:
params = GaitParameters(
    body_height=0.05,
    step_length=0.06,
    stance_width=0.10  # 10cm stance width
)
controller = DiagonalGaitController(params)

Step 2: Update Controller Logic

File: ~/workspace/source/gait_controller.py:36 Modify the controller to use the new parameter:
class DiagonalGaitController:
    # ... existing code ...
    
    def _apply_lateral_offset(self, leg: str, point: np.ndarray) -> np.ndarray:
        """Shift the foot target by the configured lateral offset."""
        # Use stance_width to compute lateral offsets
        # Left legs (FL, RL): negative Y
        # Right legs (FR, RR): positive Y
        if leg in ["FL", "RL"]:
            lateral = -self.params.stance_width / 2
        else:  # FR, RR
            lateral = self.params.stance_width / 2
        
        adjusted = point.copy()
        adjusted[1] = lateral
        return adjusted
Result: Wider stance provides better stability on rough terrain.

Step 3: Add to Adaptive Controller

File: ~/workspace/source/controllers/adaptive_gait_controller.py:30 Add the parameter to the adaptive controller’s ranges:
class AdaptiveGaitController:
    DEFAULT_RANGES = {
        "step_height": (0.03, 0.090, 0.04),
        "step_length": (0.030, 0.100, 0.06),
        "cycle_time": (0.800, 1.500, 0.80),
        "body_height": (0.04, 0.055, 0.05),
        
        # NEW: Stance width range
        "stance_width": (0.06, 0.12, 0.08),  # 6cm to 12cm
    }
    
    def update_parameters(self, param_deltas: Dict[str, float]) -> None:
        """Apply deltas to gait parameters with clipping."""
        for param_name, delta in param_deltas.items():
            if param_name not in self.param_ranges:
                continue  # Skip unknown parameters
            
            min_val, max_val, _ = self.param_ranges[param_name]
            current_val = getattr(self.current_params, param_name)
            new_val = float(np.clip(current_val + delta, min_val, max_val))
            setattr(self.current_params, param_name, new_val)
        
        self._params_dirty = True
    
    def get_current_parameters(self) -> Dict[str, float]:
        """Return current adapted parameter values."""
        return {
            "step_height": self.current_params.step_height,
            "step_length": self.current_params.step_length,
            "cycle_time": self.current_params.cycle_time,
            "body_height": self.current_params.body_height,
            "stance_width": self.current_params.stance_width,  # NEW
        }

Step 4: Update Environment

File: ~/workspace/source/envs/adaptive_gait_env.py:78 Expand action and observation spaces:
class AdaptiveGaitEnv(gym.Env):
    # Update action space size
    # Was: 16D (4 param deltas + 12 residuals)
    # Now: 17D (5 param deltas + 12 residuals)
    
    PARAM_DELTA_SCALES = {
        "step_height": 0.005,
        "step_length": 0.005,
        "cycle_time": 0.05,
        "body_height": 0.003,
        "stance_width": 0.004,  # NEW: ±4mm per step
    }
    
    def __init__(self, ...):
        # ... existing code ...
        
        # Action: [param_deltas(5), residuals(12)] = 17D
        self.action_space = gym.spaces.Box(
            low=-1.0, high=1.0, shape=(17,), dtype=np.float32
        )
    
    def _get_observation(self) -> np.ndarray:
        """Construct observation vector.
        
        Now 70D (was 69D):
        - body state: 13D
        - joint states: 24D
        - foot pos/vel: 24D
        - contacts: 4D
        - gait params: 5D (was 4D)
        """
        # ... existing observation code ...
        
        # Add stance_width to parameter observation
        current_params = self.controller.get_current_parameters()
        ranges = self.controller.param_ranges
        
        param_obs = []
        for param_name in ["step_height", "step_length", "cycle_time", 
                           "body_height", "stance_width"]:
            min_val, max_val, _ = ranges[param_name]
            center = (min_val + max_val) / 2.0
            scale = (max_val - min_val) / 2.0
            normalized = (current_params[param_name] - center) / scale
            param_obs.append(normalized)
        
        obs_components.append(np.array(param_obs, dtype=float))
        # ... rest of observation ...
    
    def _process_action(self, action: np.ndarray) -> Tuple[Dict[str, float], Dict[str, np.ndarray]]:
        """Convert 17D action to param deltas and residuals."""
        act = np.asarray(action, dtype=float).clip(-1.0, 1.0)
        
        # First 5 values are parameter deltas
        param_raw = act[:5]
        param_deltas = {
            "step_height": param_raw[0] * self.PARAM_DELTA_SCALES["step_height"],
            "step_length": param_raw[1] * self.PARAM_DELTA_SCALES["step_length"],
            "cycle_time": param_raw[2] * self.PARAM_DELTA_SCALES["cycle_time"],
            "body_height": param_raw[3] * self.PARAM_DELTA_SCALES["body_height"],
            "stance_width": param_raw[4] * self.PARAM_DELTA_SCALES["stance_width"],
        }
        
        # Remaining 12 values are residuals
        residuals: Dict[str, np.ndarray] = {}
        for i, leg in enumerate(LEG_NAMES):
            residuals[leg] = act[5 + i * 3 : 5 + (i + 1) * 3] * self.residual_scale
        
        return param_deltas, residuals

Step 5: Retrain Policy

After modifying the environment:
# Train with new parameter
python3 train_adaptive_gait_ppo.py
Existing trained models are incompatible with modified action/observation spaces. You must retrain from scratch.

Creating Custom Gaits

Subclassing DiagonalGaitController

Create controllers/gallop_gait_controller.py:
"""Gallop gait controller with asymmetric phase timing."""

from gait_controller import GaitParameters, DiagonalGaitController, LEG_NAMES
import numpy as np

class GallopGaitController(DiagonalGaitController):
    """Gallop gait with 4 distinct phases.
    
    Phase sequence:
    1. Front legs push (FL+FR)
    2. Flight phase
    3. Rear legs push (RL+RR)
    4. Flight phase
    """
    
    states = ["front_swing", "front_stance", "rear_swing", "rear_stance"]
    
    def __init__(self, params: GaitParameters):
        # Don't call super().__init__() - we'll customize everything
        self.params = params
        
        # Gallop has 4 phases, each 25% of cycle
        self.state_duration = self.params.cycle_time / 4.0
        self.phase_elapsed = 0.0
        
        # State machine setup
        from transitions import Machine
        self.machine = Machine(
            model=self,
            states=self.states,
            initial="front_swing",
            after_state_change="_update_active_legs",
            send_event=True,
        )
        
        # Transitions: front_swing -> front_stance -> rear_swing -> rear_stance -> front_swing
        self.machine.add_transition("next_phase", "front_swing", "front_stance")
        self.machine.add_transition("next_phase", "front_stance", "rear_swing")
        self.machine.add_transition("next_phase", "rear_swing", "rear_stance")
        self.machine.add_transition("next_phase", "rear_stance", "front_swing")
        
        self._build_swing_curve()
        self._update_active_legs(None)
        self._leg_targets = self._bootstrap_targets()
    
    def _update_active_legs(self, event):
        """Set active swing legs based on current state."""
        if self.state == "front_swing":
            self.active_swing_pair = ("FL", "FR")
            self.active_stance_pair = ("RL", "RR")
        elif self.state == "front_stance":
            self.active_swing_pair = ()
            self.active_stance_pair = LEG_NAMES  # All legs on ground
        elif self.state == "rear_swing":
            self.active_swing_pair = ("RL", "RR")
            self.active_stance_pair = ("FL", "FR")
        else:  # rear_stance
            self.active_swing_pair = ()
            self.active_stance_pair = LEG_NAMES  # All legs on ground
    
    def update(self, dt: float):
        """Update gallop gait."""
        if dt <= 0.0:
            return self._leg_targets
        
        self.phase_elapsed += dt
        while self.phase_elapsed >= self.state_duration:
            self.phase_elapsed -= self.state_duration
            self.next_phase()
        
        phase = np.clip(self.phase_elapsed / self.state_duration, 0.0, 1.0)
        targets = {}
        
        for leg in self.active_swing_pair:
            targets[leg] = self._evaluate_swing_curve(leg, phase)
        
        for leg in self.active_stance_pair:
            targets[leg] = self._evaluate_stance_path(leg, phase)
        
        self._leg_targets = targets
        return targets
Usage:
from controllers.gallop_gait_controller import GallopGaitController

params = GaitParameters(cycle_time=0.6)  # Faster for gallop
controller = GallopGaitController(params)

# Use in simulation loop
for _ in range(5000):
    targets = controller.update(dt=0.002)
    # ... apply targets via IK ...

Adding Gait Transitions

class MultiGaitController:
    """Controller that switches between walk, trot, and gallop based on speed."""
    
    def __init__(self):
        self.walk_controller = DiagonalGaitController(GaitParameters(cycle_time=1.2))
        self.trot_controller = DiagonalGaitController(GaitParameters(cycle_time=0.8))
        self.gallop_controller = GallopGaitController(GaitParameters(cycle_time=0.6))
        
        self.current_controller = self.walk_controller
        self.desired_speed = 0.0  # m/s
    
    def set_speed(self, speed: float):
        """Set desired forward speed and select appropriate gait."""
        self.desired_speed = speed
        
        # Gait selection thresholds
        if speed < 0.1:
            new_controller = self.walk_controller
        elif speed < 0.3:
            new_controller = self.trot_controller
        else:
            new_controller = self.gallop_controller
        
        # Only switch if different
        if new_controller != self.current_controller:
            print(f"Switching to {new_controller.__class__.__name__}")
            self.current_controller = new_controller
    
    def update(self, dt: float):
        return self.current_controller.update(dt)

Advanced Adaptation Strategies

Terrain-Aware Parameter Adaptation

Modify the environment to provide terrain information:
# In envs/adaptive_gait_env.py

def _get_local_terrain_height(self) -> np.ndarray:
    """Sample terrain height around robot.
    
    Returns:
        8D array: heights at [front, front-right, right, back-right, 
                              back, back-left, left, front-left]
    """
    body_pos = self.sensor_reader.read_sensor("body_pos")
    body_x, body_y = body_pos[0], body_pos[1]
    
    # Sample radius
    radius = 0.15  # 15cm around robot
    angles = np.linspace(0, 2 * np.pi, 8, endpoint=False)
    
    heights = []
    for angle in angles:
        sample_x = body_x + radius * np.cos(angle)
        sample_y = body_y + radius * np.sin(angle)
        
        # Query MuJoCo heightfield
        # (Simplified - actual implementation requires hfield geom queries)
        height = self._query_heightfield(sample_x, sample_y)
        heights.append(height)
    
    return np.array(heights, dtype=np.float32)

def _get_observation(self) -> np.ndarray:
    """Add terrain information to observation."""
    # ... existing observation code ...
    
    # Add 8D terrain heights
    terrain_heights = self._get_local_terrain_height()
    obs_components.append(terrain_heights)
    
    # Total observation: 69D + 8D = 77D
    return np.concatenate(obs_components).astype(np.float32)
Benefits:
  • Policy can anticipate upcoming terrain
  • Preemptively adjust step height before contact
  • Better stability on slopes

Reward Shaping for Specific Behaviors

# In envs/adaptive_gait_env.py

def _compute_reward(self) -> Tuple[float, Dict[str, float]]:
    """Reward function with energy efficiency term."""
    rewards = {}
    
    # ... existing reward components ...
    
    # NEW: Energy efficiency reward
    # Penalize large joint accelerations (proxy for torque)
    joint_states = self.sensor_reader.get_joint_states()
    joint_vels = joint_states[12:]  # Last 12 are velocities
    
    # Compute velocity changes (acceleration)
    if hasattr(self, '_prev_joint_vels'):
        joint_accels = (joint_vels - self._prev_joint_vels) / self.model.opt.timestep
        energy_penalty = -0.1 * np.sum(joint_accels**2)
        rewards["energy_efficiency"] = energy_penalty
    else:
        rewards["energy_efficiency"] = 0.0
    
    self._prev_joint_vels = joint_vels.copy()
    
    # NEW: Reward for minimal parameter changes (smooth adaptation)
    param_changes = 0.0
    current_params = self.controller.get_current_parameters()
    if hasattr(self, '_prev_params'):
        for key in current_params:
            delta = current_params[key] - self._prev_params[key]
            param_changes += delta**2
        rewards["smooth_adaptation"] = -5.0 * param_changes
    else:
        rewards["smooth_adaptation"] = 0.0
    
    self._prev_params = current_params.copy()
    
    total = sum(rewards.values())
    return total, rewards
Result: Policy learns to minimize joint jerk and smooth parameter changes.

Hierarchical Control

"""Hierarchical RL with high-level planner and low-level controller."""

class HierarchicalPolicy:
    def __init__(self, high_level_model, low_level_model):
        self.high_level = high_level_model  # Outputs: gait param targets
        self.low_level = low_level_model    # Outputs: residuals
        
        self.high_level_freq = 10  # Hz (update gait params slowly)
        self.low_level_freq = 500  # Hz (residuals at control rate)
        
        self.step_count = 0
        self.current_param_targets = None
    
    def predict(self, obs, deterministic=True):
        """Predict action with hierarchical policy."""
        self.step_count += 1
        
        # High-level policy (slow updates)
        if self.step_count % (self.low_level_freq // self.high_level_freq) == 0:
            high_obs = self._extract_high_level_obs(obs)
            param_deltas, _ = self.high_level.predict(high_obs, deterministic)
            self.current_param_targets = param_deltas
        
        # Low-level policy (every step)
        low_obs = self._extract_low_level_obs(obs)
        residuals, _ = self.low_level.predict(low_obs, deterministic)
        
        # Combine actions
        action = np.concatenate([self.current_param_targets, residuals])
        return action, None
    
    def _extract_high_level_obs(self, obs):
        """Extract high-level features (terrain, body state)."""
        # Return subset of observation for gait planning
        return obs[:30]  # Example: first 30 dimensions
    
    def _extract_low_level_obs(self, obs):
        """Extract low-level features (joint positions, foot contacts)."""
        return obs  # Use full observation
Training:
  1. Train high-level policy first with coarse timesteps
  2. Freeze high-level, train low-level with fine timesteps
  3. Fine-tune both together

Testing Custom Controllers

Unit Tests

Create tests/test_custom_controller.py:
import numpy as np
from controllers.gallop_gait_controller import GallopGaitController
from gait_controller import GaitParameters, LEG_NAMES

def test_gallop_phases():
    """Verify gallop controller produces correct phase sequence."""
    params = GaitParameters(cycle_time=0.8)
    controller = GallopGaitController(params)
    
    phase_sequence = []
    dt = 0.002  # 2ms timestep
    
    # Run for one full cycle
    for _ in range(int(0.8 / dt)):
        controller.update(dt)
        phase_sequence.append(controller.state)
    
    # Check that all 4 phases appear
    unique_phases = set(phase_sequence)
    assert unique_phases == {"front_swing", "front_stance", "rear_swing", "rear_stance"}
    
    print("✓ Gallop controller produces all 4 phases")

def test_foot_targets_reachable():
    """Verify all foot targets are within IK workspace."""
    from ik import solve_leg_ik_3dof
    
    params = GaitParameters()
    controller = GallopGaitController(params)
    
    unreachable_count = 0
    for _ in range(1000):
        targets = controller.update(0.002)
        
        for leg, target in targets.items():
            result = solve_leg_ik_3dof(target, L1=0.045, L2=0.06, base_dist=0.021, mode=2)
            if result is None:
                unreachable_count += 1
                print(f"⚠ Unreachable target for {leg}: {target}")
    
    assert unreachable_count == 0, f"{unreachable_count} unreachable targets!"
    print("✓ All foot targets are reachable")

if __name__ == "__main__":
    test_gallop_phases()
    test_foot_targets_reachable()
Run tests:
python3 tests/test_custom_controller.py

Visualization

import matplotlib.pyplot as plt
import numpy as np
from stable_baselines3 import PPO
from envs.adaptive_gait_env import AdaptiveGaitEnv

# Load trained model
model = PPO.load("runs/adaptive_gait_*/final_model.zip")
env = AdaptiveGaitEnv("model/world_train.xml")

# Collect parameter trajectories
params_history = {"step_height": [], "step_length": [], 
                  "cycle_time": [], "body_height": []}
timesteps = []

obs, _ = env.reset()
for step in range(2000):
    action, _ = model.predict(obs, deterministic=True)
    obs, _, terminated, truncated, info = env.step(action)
    
    # Record parameters
    params = info["gait_params"]
    for key in params_history:
        params_history[key].append(params[key])
    timesteps.append(step * 0.002)  # 2ms per step
    
    if terminated or truncated:
        break

# Plot
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
for ax, (param_name, values) in zip(axes.flatten(), params_history.items()):
    ax.plot(timesteps, values)
    ax.set_xlabel("Time (s)")
    ax.set_ylabel(param_name.replace("_", " ").title())
    ax.grid(True)

plt.tight_layout()
plt.savefig("gait_parameters_over_time.png")
print("Saved gait_parameters_over_time.png")

Best Practices

Design Guidelines:
  1. Start Simple: Begin with one new parameter, verify it works, then add more
  2. Respect Physics: Ensure parameter ranges don’t violate kinematic constraints
  3. Test in Isolation: Verify controller outputs with unit tests before RL training
  4. Gradual Changes: Use small delta scales to prevent large parameter jumps
  5. Monitor Training: Watch reward components to ensure new parameters are being learned
  6. Baseline Comparison: Always compare against zero-delta baseline
Common Pitfalls:
  • Incompatible Spaces: Changing action/observation size requires retraining from scratch
  • Parameter Ranges: Too wide ranges cause instability, too narrow limits adaptation
  • IK Failures: New parameters may produce unreachable targets - validate with IK tests
  • Reward Conflicts: New reward terms may conflict with existing ones - tune weights carefully

Example: Complete Feature Addition

Goal: Add adjustable foot clearance during swing phase.1. Update GaitParameters:
# gait_controller.py
@dataclass
class GaitParameters:
    # ... existing ...
    foot_clearance: float = 0.01  # Extra vertical clearance (meters)
2. Modify swing curve generation:
# gait_controller.py:106
def _build_swing_curve(self) -> None:
    half_step = self.params.step_length / 2.0
    # Add foot_clearance to lift height
    lift_height = max(self.params.body_height - self.params.step_height - self.params.foot_clearance, 1e-4)
    # ... rest of Bezier curve setup ...
3. Add to adaptive controller:
# controllers/adaptive_gait_controller.py
DEFAULT_RANGES = {
    # ... existing ...
    "foot_clearance": (0.0, 0.02, 0.01),  # 0-2cm clearance
}
4. Expand environment:
# envs/adaptive_gait_env.py
PARAM_DELTA_SCALES = {
    # ... existing ...
    "foot_clearance": 0.002,  # ±2mm per step
}

# Update action space: 18D (was 17D)
self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(18,), dtype=np.float32)

# Update observation: 71D (was 70D)
# Add foot_clearance to parameter observation
5. Test:
# Unit test
python3 -c "from gait_controller import GaitParameters; p = GaitParameters(foot_clearance=0.02); print(f'Clearance: {p.foot_clearance}m')"

# Visual test
python3 height_control.py

# Train
python3 train_adaptive_gait_ppo.py
Expected Behavior: Policy learns to increase foot_clearance on rough terrain, reducing foot scuffing.

Next Steps

Project Structure

Understand codebase organization

Debugging

Debug controller issues

API Reference

Detailed API documentation

Training Guide

Train policies with custom controllers

Build docs developers (and LLMs) love