Controller Architecture
The project uses a hierarchical controller architecture:┌─────────────────────────────────────┐
│ RL Policy (Neural Network) │
│ - Outputs: param deltas + residuals│
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ AdaptiveGaitController │
│ - Applies parameter updates │
│ - Adds residual corrections │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ DiagonalGaitController │
│ - Generates nominal trajectories │
│ - Bezier swing curves │
│ - Linear stance paths │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ IK Solver (ik.py) │
│ - Converts foot targets to joints │
└──────────────┬──────────────────────┘
│
▼
MuJoCo Simulation
Adding New Gait Parameters
Step 1: Extend GaitParameters
File:~/workspace/source/gait_controller.py:18
Add new parameters to the GaitParameters dataclass:
Example: Add stance_width parameter
Example: Add stance_width parameter
@dataclass
class GaitParameters:
"""Configuration bundle for the trot gait."""
body_height: float = 0.07
step_length: float = 0.05
step_height: float = 0.015
cycle_time: float = 0.8
swing_shape: float = 0.35
# NEW: Stance width (lateral foot spacing)
stance_width: float = 0.08 # meters
lateral_offsets: Dict[str, float] = field(
default_factory=lambda: {
"FL": -0.0,
"FR": 0.0,
"RL": -0.0,
"RR": 0.0,
}
)
params = GaitParameters(
body_height=0.05,
step_length=0.06,
stance_width=0.10 # 10cm stance width
)
controller = DiagonalGaitController(params)
Step 2: Update Controller Logic
File:~/workspace/source/gait_controller.py:36
Modify the controller to use the new parameter:
Apply stance_width in foot trajectories
Apply stance_width in foot trajectories
class DiagonalGaitController:
# ... existing code ...
def _apply_lateral_offset(self, leg: str, point: np.ndarray) -> np.ndarray:
"""Shift the foot target by the configured lateral offset."""
# Use stance_width to compute lateral offsets
# Left legs (FL, RL): negative Y
# Right legs (FR, RR): positive Y
if leg in ["FL", "RL"]:
lateral = -self.params.stance_width / 2
else: # FR, RR
lateral = self.params.stance_width / 2
adjusted = point.copy()
adjusted[1] = lateral
return adjusted
Step 3: Add to Adaptive Controller
File:~/workspace/source/controllers/adaptive_gait_controller.py:30
Add the parameter to the adaptive controller’s ranges:
Make stance_width trainable
Make stance_width trainable
class AdaptiveGaitController:
DEFAULT_RANGES = {
"step_height": (0.03, 0.090, 0.04),
"step_length": (0.030, 0.100, 0.06),
"cycle_time": (0.800, 1.500, 0.80),
"body_height": (0.04, 0.055, 0.05),
# NEW: Stance width range
"stance_width": (0.06, 0.12, 0.08), # 6cm to 12cm
}
def update_parameters(self, param_deltas: Dict[str, float]) -> None:
"""Apply deltas to gait parameters with clipping."""
for param_name, delta in param_deltas.items():
if param_name not in self.param_ranges:
continue # Skip unknown parameters
min_val, max_val, _ = self.param_ranges[param_name]
current_val = getattr(self.current_params, param_name)
new_val = float(np.clip(current_val + delta, min_val, max_val))
setattr(self.current_params, param_name, new_val)
self._params_dirty = True
def get_current_parameters(self) -> Dict[str, float]:
"""Return current adapted parameter values."""
return {
"step_height": self.current_params.step_height,
"step_length": self.current_params.step_length,
"cycle_time": self.current_params.cycle_time,
"body_height": self.current_params.body_height,
"stance_width": self.current_params.stance_width, # NEW
}
Step 4: Update Environment
File:~/workspace/source/envs/adaptive_gait_env.py:78
Expand action and observation spaces:
Add stance_width to action/observation
Add stance_width to action/observation
class AdaptiveGaitEnv(gym.Env):
# Update action space size
# Was: 16D (4 param deltas + 12 residuals)
# Now: 17D (5 param deltas + 12 residuals)
PARAM_DELTA_SCALES = {
"step_height": 0.005,
"step_length": 0.005,
"cycle_time": 0.05,
"body_height": 0.003,
"stance_width": 0.004, # NEW: ±4mm per step
}
def __init__(self, ...):
# ... existing code ...
# Action: [param_deltas(5), residuals(12)] = 17D
self.action_space = gym.spaces.Box(
low=-1.0, high=1.0, shape=(17,), dtype=np.float32
)
def _get_observation(self) -> np.ndarray:
"""Construct observation vector.
Now 70D (was 69D):
- body state: 13D
- joint states: 24D
- foot pos/vel: 24D
- contacts: 4D
- gait params: 5D (was 4D)
"""
# ... existing observation code ...
# Add stance_width to parameter observation
current_params = self.controller.get_current_parameters()
ranges = self.controller.param_ranges
param_obs = []
for param_name in ["step_height", "step_length", "cycle_time",
"body_height", "stance_width"]:
min_val, max_val, _ = ranges[param_name]
center = (min_val + max_val) / 2.0
scale = (max_val - min_val) / 2.0
normalized = (current_params[param_name] - center) / scale
param_obs.append(normalized)
obs_components.append(np.array(param_obs, dtype=float))
# ... rest of observation ...
def _process_action(self, action: np.ndarray) -> Tuple[Dict[str, float], Dict[str, np.ndarray]]:
"""Convert 17D action to param deltas and residuals."""
act = np.asarray(action, dtype=float).clip(-1.0, 1.0)
# First 5 values are parameter deltas
param_raw = act[:5]
param_deltas = {
"step_height": param_raw[0] * self.PARAM_DELTA_SCALES["step_height"],
"step_length": param_raw[1] * self.PARAM_DELTA_SCALES["step_length"],
"cycle_time": param_raw[2] * self.PARAM_DELTA_SCALES["cycle_time"],
"body_height": param_raw[3] * self.PARAM_DELTA_SCALES["body_height"],
"stance_width": param_raw[4] * self.PARAM_DELTA_SCALES["stance_width"],
}
# Remaining 12 values are residuals
residuals: Dict[str, np.ndarray] = {}
for i, leg in enumerate(LEG_NAMES):
residuals[leg] = act[5 + i * 3 : 5 + (i + 1) * 3] * self.residual_scale
return param_deltas, residuals
Step 5: Retrain Policy
After modifying the environment:# Train with new parameter
python3 train_adaptive_gait_ppo.py
Existing trained models are incompatible with modified action/observation spaces. You must retrain from scratch.
Creating Custom Gaits
Subclassing DiagonalGaitController
Example: Gallop Gait
Example: Gallop Gait
Create Usage:
controllers/gallop_gait_controller.py:"""Gallop gait controller with asymmetric phase timing."""
from gait_controller import GaitParameters, DiagonalGaitController, LEG_NAMES
import numpy as np
class GallopGaitController(DiagonalGaitController):
"""Gallop gait with 4 distinct phases.
Phase sequence:
1. Front legs push (FL+FR)
2. Flight phase
3. Rear legs push (RL+RR)
4. Flight phase
"""
states = ["front_swing", "front_stance", "rear_swing", "rear_stance"]
def __init__(self, params: GaitParameters):
# Don't call super().__init__() - we'll customize everything
self.params = params
# Gallop has 4 phases, each 25% of cycle
self.state_duration = self.params.cycle_time / 4.0
self.phase_elapsed = 0.0
# State machine setup
from transitions import Machine
self.machine = Machine(
model=self,
states=self.states,
initial="front_swing",
after_state_change="_update_active_legs",
send_event=True,
)
# Transitions: front_swing -> front_stance -> rear_swing -> rear_stance -> front_swing
self.machine.add_transition("next_phase", "front_swing", "front_stance")
self.machine.add_transition("next_phase", "front_stance", "rear_swing")
self.machine.add_transition("next_phase", "rear_swing", "rear_stance")
self.machine.add_transition("next_phase", "rear_stance", "front_swing")
self._build_swing_curve()
self._update_active_legs(None)
self._leg_targets = self._bootstrap_targets()
def _update_active_legs(self, event):
"""Set active swing legs based on current state."""
if self.state == "front_swing":
self.active_swing_pair = ("FL", "FR")
self.active_stance_pair = ("RL", "RR")
elif self.state == "front_stance":
self.active_swing_pair = ()
self.active_stance_pair = LEG_NAMES # All legs on ground
elif self.state == "rear_swing":
self.active_swing_pair = ("RL", "RR")
self.active_stance_pair = ("FL", "FR")
else: # rear_stance
self.active_swing_pair = ()
self.active_stance_pair = LEG_NAMES # All legs on ground
def update(self, dt: float):
"""Update gallop gait."""
if dt <= 0.0:
return self._leg_targets
self.phase_elapsed += dt
while self.phase_elapsed >= self.state_duration:
self.phase_elapsed -= self.state_duration
self.next_phase()
phase = np.clip(self.phase_elapsed / self.state_duration, 0.0, 1.0)
targets = {}
for leg in self.active_swing_pair:
targets[leg] = self._evaluate_swing_curve(leg, phase)
for leg in self.active_stance_pair:
targets[leg] = self._evaluate_stance_path(leg, phase)
self._leg_targets = targets
return targets
from controllers.gallop_gait_controller import GallopGaitController
params = GaitParameters(cycle_time=0.6) # Faster for gallop
controller = GallopGaitController(params)
# Use in simulation loop
for _ in range(5000):
targets = controller.update(dt=0.002)
# ... apply targets via IK ...
Adding Gait Transitions
Example: Speed-based gait switching
Example: Speed-based gait switching
class MultiGaitController:
"""Controller that switches between walk, trot, and gallop based on speed."""
def __init__(self):
self.walk_controller = DiagonalGaitController(GaitParameters(cycle_time=1.2))
self.trot_controller = DiagonalGaitController(GaitParameters(cycle_time=0.8))
self.gallop_controller = GallopGaitController(GaitParameters(cycle_time=0.6))
self.current_controller = self.walk_controller
self.desired_speed = 0.0 # m/s
def set_speed(self, speed: float):
"""Set desired forward speed and select appropriate gait."""
self.desired_speed = speed
# Gait selection thresholds
if speed < 0.1:
new_controller = self.walk_controller
elif speed < 0.3:
new_controller = self.trot_controller
else:
new_controller = self.gallop_controller
# Only switch if different
if new_controller != self.current_controller:
print(f"Switching to {new_controller.__class__.__name__}")
self.current_controller = new_controller
def update(self, dt: float):
return self.current_controller.update(dt)
Advanced Adaptation Strategies
Terrain-Aware Parameter Adaptation
Example: Heightfield-based adaptation
Example: Heightfield-based adaptation
Modify the environment to provide terrain information:Benefits:
# In envs/adaptive_gait_env.py
def _get_local_terrain_height(self) -> np.ndarray:
"""Sample terrain height around robot.
Returns:
8D array: heights at [front, front-right, right, back-right,
back, back-left, left, front-left]
"""
body_pos = self.sensor_reader.read_sensor("body_pos")
body_x, body_y = body_pos[0], body_pos[1]
# Sample radius
radius = 0.15 # 15cm around robot
angles = np.linspace(0, 2 * np.pi, 8, endpoint=False)
heights = []
for angle in angles:
sample_x = body_x + radius * np.cos(angle)
sample_y = body_y + radius * np.sin(angle)
# Query MuJoCo heightfield
# (Simplified - actual implementation requires hfield geom queries)
height = self._query_heightfield(sample_x, sample_y)
heights.append(height)
return np.array(heights, dtype=np.float32)
def _get_observation(self) -> np.ndarray:
"""Add terrain information to observation."""
# ... existing observation code ...
# Add 8D terrain heights
terrain_heights = self._get_local_terrain_height()
obs_components.append(terrain_heights)
# Total observation: 69D + 8D = 77D
return np.concatenate(obs_components).astype(np.float32)
- Policy can anticipate upcoming terrain
- Preemptively adjust step height before contact
- Better stability on slopes
Reward Shaping for Specific Behaviors
Example: Reward for energy efficiency
Example: Reward for energy efficiency
# In envs/adaptive_gait_env.py
def _compute_reward(self) -> Tuple[float, Dict[str, float]]:
"""Reward function with energy efficiency term."""
rewards = {}
# ... existing reward components ...
# NEW: Energy efficiency reward
# Penalize large joint accelerations (proxy for torque)
joint_states = self.sensor_reader.get_joint_states()
joint_vels = joint_states[12:] # Last 12 are velocities
# Compute velocity changes (acceleration)
if hasattr(self, '_prev_joint_vels'):
joint_accels = (joint_vels - self._prev_joint_vels) / self.model.opt.timestep
energy_penalty = -0.1 * np.sum(joint_accels**2)
rewards["energy_efficiency"] = energy_penalty
else:
rewards["energy_efficiency"] = 0.0
self._prev_joint_vels = joint_vels.copy()
# NEW: Reward for minimal parameter changes (smooth adaptation)
param_changes = 0.0
current_params = self.controller.get_current_parameters()
if hasattr(self, '_prev_params'):
for key in current_params:
delta = current_params[key] - self._prev_params[key]
param_changes += delta**2
rewards["smooth_adaptation"] = -5.0 * param_changes
else:
rewards["smooth_adaptation"] = 0.0
self._prev_params = current_params.copy()
total = sum(rewards.values())
return total, rewards
Hierarchical Control
Example: Separate high-level and low-level policies
Example: Separate high-level and low-level policies
"""Hierarchical RL with high-level planner and low-level controller."""
class HierarchicalPolicy:
def __init__(self, high_level_model, low_level_model):
self.high_level = high_level_model # Outputs: gait param targets
self.low_level = low_level_model # Outputs: residuals
self.high_level_freq = 10 # Hz (update gait params slowly)
self.low_level_freq = 500 # Hz (residuals at control rate)
self.step_count = 0
self.current_param_targets = None
def predict(self, obs, deterministic=True):
"""Predict action with hierarchical policy."""
self.step_count += 1
# High-level policy (slow updates)
if self.step_count % (self.low_level_freq // self.high_level_freq) == 0:
high_obs = self._extract_high_level_obs(obs)
param_deltas, _ = self.high_level.predict(high_obs, deterministic)
self.current_param_targets = param_deltas
# Low-level policy (every step)
low_obs = self._extract_low_level_obs(obs)
residuals, _ = self.low_level.predict(low_obs, deterministic)
# Combine actions
action = np.concatenate([self.current_param_targets, residuals])
return action, None
def _extract_high_level_obs(self, obs):
"""Extract high-level features (terrain, body state)."""
# Return subset of observation for gait planning
return obs[:30] # Example: first 30 dimensions
def _extract_low_level_obs(self, obs):
"""Extract low-level features (joint positions, foot contacts)."""
return obs # Use full observation
- Train high-level policy first with coarse timesteps
- Freeze high-level, train low-level with fine timesteps
- Fine-tune both together
Testing Custom Controllers
Unit Tests
Test controller outputs
Test controller outputs
Create Run tests:
tests/test_custom_controller.py:import numpy as np
from controllers.gallop_gait_controller import GallopGaitController
from gait_controller import GaitParameters, LEG_NAMES
def test_gallop_phases():
"""Verify gallop controller produces correct phase sequence."""
params = GaitParameters(cycle_time=0.8)
controller = GallopGaitController(params)
phase_sequence = []
dt = 0.002 # 2ms timestep
# Run for one full cycle
for _ in range(int(0.8 / dt)):
controller.update(dt)
phase_sequence.append(controller.state)
# Check that all 4 phases appear
unique_phases = set(phase_sequence)
assert unique_phases == {"front_swing", "front_stance", "rear_swing", "rear_stance"}
print("✓ Gallop controller produces all 4 phases")
def test_foot_targets_reachable():
"""Verify all foot targets are within IK workspace."""
from ik import solve_leg_ik_3dof
params = GaitParameters()
controller = GallopGaitController(params)
unreachable_count = 0
for _ in range(1000):
targets = controller.update(0.002)
for leg, target in targets.items():
result = solve_leg_ik_3dof(target, L1=0.045, L2=0.06, base_dist=0.021, mode=2)
if result is None:
unreachable_count += 1
print(f"⚠ Unreachable target for {leg}: {target}")
assert unreachable_count == 0, f"{unreachable_count} unreachable targets!"
print("✓ All foot targets are reachable")
if __name__ == "__main__":
test_gallop_phases()
test_foot_targets_reachable()
python3 tests/test_custom_controller.py
Visualization
Plot gait parameters over time
Plot gait parameters over time
import matplotlib.pyplot as plt
import numpy as np
from stable_baselines3 import PPO
from envs.adaptive_gait_env import AdaptiveGaitEnv
# Load trained model
model = PPO.load("runs/adaptive_gait_*/final_model.zip")
env = AdaptiveGaitEnv("model/world_train.xml")
# Collect parameter trajectories
params_history = {"step_height": [], "step_length": [],
"cycle_time": [], "body_height": []}
timesteps = []
obs, _ = env.reset()
for step in range(2000):
action, _ = model.predict(obs, deterministic=True)
obs, _, terminated, truncated, info = env.step(action)
# Record parameters
params = info["gait_params"]
for key in params_history:
params_history[key].append(params[key])
timesteps.append(step * 0.002) # 2ms per step
if terminated or truncated:
break
# Plot
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
for ax, (param_name, values) in zip(axes.flatten(), params_history.items()):
ax.plot(timesteps, values)
ax.set_xlabel("Time (s)")
ax.set_ylabel(param_name.replace("_", " ").title())
ax.grid(True)
plt.tight_layout()
plt.savefig("gait_parameters_over_time.png")
print("Saved gait_parameters_over_time.png")
Best Practices
Design Guidelines:
- Start Simple: Begin with one new parameter, verify it works, then add more
- Respect Physics: Ensure parameter ranges don’t violate kinematic constraints
- Test in Isolation: Verify controller outputs with unit tests before RL training
- Gradual Changes: Use small delta scales to prevent large parameter jumps
- Monitor Training: Watch reward components to ensure new parameters are being learned
- Baseline Comparison: Always compare against zero-delta baseline
Common Pitfalls:
- Incompatible Spaces: Changing action/observation size requires retraining from scratch
- Parameter Ranges: Too wide ranges cause instability, too narrow limits adaptation
- IK Failures: New parameters may produce unreachable targets - validate with IK tests
- Reward Conflicts: New reward terms may conflict with existing ones - tune weights carefully
Example: Complete Feature Addition
Full workflow: Adding foot_clearance parameter
Full workflow: Adding foot_clearance parameter
Goal: Add adjustable foot clearance during swing phase.1. Update GaitParameters:2. Modify swing curve generation:3. Add to adaptive controller:4. Expand environment:5. Test:Expected Behavior: Policy learns to increase foot_clearance on rough terrain, reducing foot scuffing.
# gait_controller.py
@dataclass
class GaitParameters:
# ... existing ...
foot_clearance: float = 0.01 # Extra vertical clearance (meters)
# gait_controller.py:106
def _build_swing_curve(self) -> None:
half_step = self.params.step_length / 2.0
# Add foot_clearance to lift height
lift_height = max(self.params.body_height - self.params.step_height - self.params.foot_clearance, 1e-4)
# ... rest of Bezier curve setup ...
# controllers/adaptive_gait_controller.py
DEFAULT_RANGES = {
# ... existing ...
"foot_clearance": (0.0, 0.02, 0.01), # 0-2cm clearance
}
# envs/adaptive_gait_env.py
PARAM_DELTA_SCALES = {
# ... existing ...
"foot_clearance": 0.002, # ±2mm per step
}
# Update action space: 18D (was 17D)
self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(18,), dtype=np.float32)
# Update observation: 71D (was 70D)
# Add foot_clearance to parameter observation
# Unit test
python3 -c "from gait_controller import GaitParameters; p = GaitParameters(foot_clearance=0.02); print(f'Clearance: {p.foot_clearance}m')"
# Visual test
python3 height_control.py
# Train
python3 train_adaptive_gait_ppo.py
Next Steps
Project Structure
Understand codebase organization
Debugging
Debug controller issues
API Reference
Detailed API documentation
Training Guide
Train policies with custom controllers