Documentation Index
Fetch the complete documentation index at: https://mintlify.com/lansinuote/Simple_Reinforcement_Learning/llms.txt
Use this file to discover all available pages before exploring further.
Multi-agent reinforcement learning (MARL) extends the single-agent setting to scenarios with multiple interacting decision-makers sharing a common environment. Agents may be cooperative, competitive, or a mixture of both. This tutorial trains a team of agents in a cooperative Combat grid-world where a team of learned agents must work together to defeat a team of rule-based bots.
The simplest MARL approach is independent learning: each agent maintains its own policy and is trained as if the other agents were part of the (non-stationary) environment. Despite its simplicity, independent learning is effective in many cooperative settings because agents implicitly coordinate through shared rewards.
The Combat Environment
The Combat class (from combat.py) simulates a 15×15 grid-world battle between two teams. In this notebook, both teams have 2 members. Each agent can move in four directions, do nothing, or attack one of the opponents. All agents start with 3 health points. The game ends when one team is eliminated or 100 steps have elapsed.
from combat import Combat
env = Combat(grid_shape=(15, 15), n_agents=2, n_opponents=2)
observations = env.reset()
# Sample random actions for all 2 agents
actions = env.action_space.sample()
obs, rewards, dones, info = env.step(actions)
The action_space for each agent is Discrete(5 + n_opponents). With n_opponents=2, each agent has 7 actions: 0–4 are moves/no-op, and 5–6 attack opponent 0 or opponent 1.
Environment Interface
| Method / Property | Description |
|---|
reset() | Returns a list of 150-element observation vectors, one per agent |
step(agents_action) | Returns (observations, rewards, dones, info) |
get_agent_obs() | Computes a 5×5 local observation window per agent, encoded as a 150-element flat vector |
action_space | MultiAgentActionSpace — list of Discrete(7) spaces (for 2 opponents) |
observation_space | MultiAgentObservationSpace — list of Box spaces |
Action Space
class MultiAgentActionSpace(list):
def __init__(self, agents_action_space):
for x in agents_action_space:
assert isinstance(x, gym.spaces.space.Space)
super().__init__(agents_action_space)
def sample(self):
return [agent_action_space.sample() for agent_action_space in self._agents_action_space]
Observation Space
Each agent sees a 5×5 local grid centered on its position. For every cell, 6 channels encode: team type (±1), agent ID, health, cooldown, x-coordinate, y-coordinate. This yields 6 × 5 × 5 = 150 features per agent:
def get_agent_obs(self):
_obs = []
for agent_i in range(self.n_agents):
pos = self.agent_pos[agent_i]
_agent_i_obs = np.zeros((6, 5, 5))
for row in range(5):
for col in range(5):
grid_row = row + (pos[0] - 2)
grid_col = col + (pos[1] - 2)
if self.is_valid([grid_row, grid_col]) and \
PRE_IDS['empty'] not in self._full_obs[grid_row][grid_col]:
x = self._full_obs[grid_row][grid_col]
_type = 1 if PRE_IDS['agent'] in x else -1
_id = int(x[1:]) - 1
_agent_i_obs[0][row][col] = _type
_agent_i_obs[1][row][col] = _id
_agent_i_obs[2][row][col] = (
self.agent_health[_id] if _type == 1 else self.opp_health[_id]
)
_agent_i_obs[3][row][col] = (
1 if (self._agent_cool[_id] if _type == 1 else self._opp_cool[_id]) else -1
)
_agent_i_obs[4][row][col] = pos[0] / self._grid_shape[0]
_agent_i_obs[5][row][col] = pos[1] / self._grid_shape[1]
_obs.append(_agent_i_obs.flatten().tolist())
return _obs
Bot Policy
The opponent team runs a hardcoded bot policy. Each bot:
- Attacks the nearest enemy agent within its 3×3 firing range.
- If no target is in range, moves toward the nearest visible enemy (within 5×5 view).
@property
def opps_action(self):
visible_agents = set()
opp_agent_distance = {i: [] for i in range(self._n_opponents)}
for opp_i, opp_pos in self.opp_pos.items():
for agent_i, agent_pos in self.agent_pos.items():
if self.agent_health[agent_i] > 0 and self.is_visible(opp_pos, agent_pos):
visible_agents.add(agent_i)
distance = abs(agent_pos[0] - opp_pos[0]) + abs(agent_pos[1] - opp_pos[1])
opp_agent_distance[opp_i].append([distance, agent_i])
opp_action_n = []
for opp_i in range(self._n_opponents):
action = None
for _, agent_i in sorted(opp_agent_distance[opp_i]):
if agent_i in visible_agents:
if self.is_fireable(self.opp_pos[opp_i], self.agent_pos[agent_i]):
action = agent_i + 5 # attack
else:
action = self.reduce_distance_move(
self.opp_pos[opp_i], self.agent_pos[agent_i]
)
break
if action is None:
action = random.choice(range(5))
opp_action_n.append(action)
return opp_action_n
Rewards
- +1 for each successful hit on an opponent.
- −1 for each hit received from an opponent.
- A
step_cost (default 0) may also be applied at each step.
- +100 bonus is added in the training code when the agent team wins an episode.
Independent PPO Learner
A single PPO network is shared by both agents. Each agent’s 150-element observation is fed independently through the same network, producing a distribution over 7 actions (5 moves + 2 attack targets):
import torch
import random
class PPO:
def __init__(self):
self.model_action = torch.nn.Sequential(
torch.nn.Linear(150, 64), torch.nn.ReLU(),
torch.nn.Linear(64, 64), torch.nn.ReLU(),
torch.nn.Linear(64, 7), # 5 moves/no-op + 2 attack targets
torch.nn.Softmax(dim=1),
)
self.model_value = torch.nn.Sequential(
torch.nn.Linear(150, 64), torch.nn.ReLU(),
torch.nn.Linear(64, 64), torch.nn.ReLU(),
torch.nn.Linear(64, 1),
)
self.optimizer_action = torch.optim.Adam(
self.model_action.parameters(), lr=3e-4
)
self.optimizer_value = torch.optim.Adam(
self.model_value.parameters(), lr=3e-3
)
self.mse_loss = torch.nn.MSELoss()
def get_action(self, state):
state = torch.FloatTensor(state).reshape(1, 150)
weights = self.model_action(state).squeeze(dim=0).tolist()
action = random.choices(range(7), weights=weights, k=1)[0]
return action
def train(self, state, action, reward, next_state, over):
target = self.model_value(next_state) * 0.99 * (1 - over) + reward
target = target.detach()
value = self.model_value(state)
delta = (target - value).squeeze(dim=1).tolist()
advantages = self._get_advantages(delta)
advantages = torch.FloatTensor(advantages).reshape(-1, 1)
old_prob = self.model_action(state).gather(1, action).log().detach()
for _ in range(1):
new_prob = self.model_action(state).gather(1, action).log()
ratio = (new_prob - old_prob).exp()
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 0.8, 1.2) * advantages
loss_action = -torch.min(surr1, surr2).mean()
self.optimizer_action.zero_grad()
loss_action.backward()
self.optimizer_action.step()
loss_value = self.mse_loss(self.model_value(state), target)
self.optimizer_value.zero_grad()
loss_value.backward()
self.optimizer_value.step()
def _get_advantages(self, deltas):
advantages, s = [], 0.0
for delta in reversed(deltas):
s = 0.99 * 0.97 * s + delta
advantages.append(s)
advantages.reverse()
return advantages
ppo = PPO()
Data Collection and Training Loop
Both agents share the PPO instance. Each step, each agent’s observation is processed independently:
def get_data():
data0 = {'state': [], 'action': [], 'reward': [], 'next_state': [], 'over': []}
data1 = {'state': [], 'action': [], 'reward': [], 'next_state': [], 'over': []}
state = env.reset()
over = False
while not over:
action = [ppo.get_action(state[0]), ppo.get_action(state[1])]
next_state, reward, over, info = env.step(action)
win = info['win']
# Reward shaping: big bonus for winning
if win:
reward[0] += 100
reward[1] += 100
else:
reward[0] -= 0.1
reward[1] -= 0.1
for data, i in [(data0, 0), (data1, 1)]:
data['state'].append(state[i])
data['action'].append(action[i])
data['reward'].append(reward[i])
data['next_state'].append(next_state[i])
data['over'].append(False)
state = next_state
over = over[0] and over[1]
for data in [data0, data1]:
data['state'] = torch.FloatTensor(data['state']).reshape(-1, 150)
data['action'] = torch.LongTensor(data['action']).reshape(-1, 1)
data['reward'] = torch.FloatTensor(data['reward']).reshape(-1, 1)
data['next_state'] = torch.FloatTensor(data['next_state']).reshape(-1, 150)
data['over'] = torch.LongTensor(data['over']).reshape(-1, 1)
return data0, data1, win
wins = []
for i in range(200000):
data0, data1, win = get_data()
wins.append(win)
ppo.train(**data0)
ppo.train(**data1)
if i % 10000 == 0:
wins = wins[-100:]
print(i, sum(wins) / len(wins))
wins = []
Training output:
0 0.00
10000 0.10
20000 0.21
30000 0.27
120000 0.37
190000 0.37
The win rate climbs from 0% to around 37% against the bot team. The bots have shared vision (any agent visible to any bot is visible to all bots), giving them a significant advantage, so a 37% win rate represents genuine learned coordination.
Independent Learning vs. Centralized Training
Independent learning (used here) trains each agent’s policy without explicit modeling of other agents. It is simple but treats other agents as part of a non-stationary environment, which can cause training instability.Centralized Training with Decentralized Execution (CTDE) architectures like MADDPG or MAPPO address this by giving each critic access to all agents’ observations and actions during training, while keeping the actors decentralized at execution time.
| Approach | Pros | Cons |
|---|
| Independent learning | Simple, scalable | Non-stationarity, potential instability |
| CTDE (e.g., MADDPG) | Stable, better coordination | Requires centralized critic; harder to scale |
| Fully centralized | Best coordination | Exponential action space; impractical at scale |