Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/MilesONerd/neurenix/llms.txt

Use this file to discover all available pages before exploring further.

Multi-Agent Systems

The MultiAgent class enables you to coordinate multiple agents interacting in a shared environment. This is essential for multi-agent reinforcement learning, competitive/cooperative scenarios, and complex simulations.

MultiAgent Class

The MultiAgent class is defined in neurenix/agent/multi_agent.py and provides functionality for coordinating multiple agents in a shared environment.

Constructor

from neurenix.agent import Agent, Environment, MultiAgent

# Create agents and environment
agents = [MyAgent(f"agent-{i}") for i in range(3)]
env = MyEnvironment()

# Create multi-agent system
mas = MultiAgent(agents, env)
Parameters:
  • agents (List[Agent]): List of agents in the system
  • environment (Environment): Shared environment for the agents

Properties

agents

The list of agents in the system.
print(f"Number of agents: {len(mas.agents)}")
for agent in mas.agents:
    print(agent.name)

environment

The shared environment for the agents.
env_state = mas.environment.state

step_count

Get the number of steps taken in the current episode.
print(f"Current step: {mas.step_count}")
Returns: int - Number of steps taken

Core Methods

step()

Perform a single step of the multi-agent system. This method:
  1. Gets observations for each agent from the environment
  2. Has each agent select an action based on its observation
  3. Applies all actions to the environment
  4. Returns the results
results = mas.step()

print(results["observations"])  # Observations for each agent
print(results["actions"])       # Actions taken by each agent
print(results["rewards"])       # Rewards for each agent
print(results["done"])          # Whether episode is complete
print(results["info"])          # Additional information
Returns: Dict[str, Any] - Dictionary containing:
  • observations (dict): Observations for each agent (keyed by agent.id)
  • actions (dict): Actions taken by each agent (keyed by agent.id)
  • rewards (dict): Rewards for each agent (keyed by agent.id)
  • done (bool): Whether the episode is complete
  • info (dict): Additional information from the environment

reset()

Reset the multi-agent system. This method:
  1. Resets the environment
  2. Resets each agent
  3. Returns the initial observations
initial_observations = mas.reset()

for agent_id, obs in initial_observations.items():
    print(f"Agent {agent_id} initial observation: {obs}")
Returns: Dict[str, Any] - Dictionary containing initial observations for each agent (keyed by agent.id)

add_agent(agent)

Add a new agent to the system.
new_agent = MyAgent("agent-new")
mas.add_agent(new_agent)
Parameters:
  • agent (Agent): The agent to add
Returns: None

remove_agent(agent_id)

Remove an agent from the system.
removed_agent = mas.remove_agent("agent-1")
if removed_agent:
    print(f"Removed agent: {removed_agent.name}")
else:
    print("Agent not found")
Parameters:
  • agent_id (str): ID of the agent to remove
Returns: Optional[Agent] - The removed agent, or None if not found

Examples

Basic Multi-Agent Simulation

Run a simple multi-agent simulation:
from neurenix.agent import Agent, Environment, MultiAgent

# Define custom agent
class SimpleAgent(Agent):
    def __init__(self, name):
        super().__init__(name)
        self.id = name  # Add id attribute for MultiAgent
    
    def act(self, observation):
        # Simple action selection
        return {"move": "forward"}
    
    def learn(self, experience):
        pass

# Define custom environment
class GridEnvironment(Environment):
    def __init__(self, size=10):
        super().__init__()
        self.size = size
    
    def step(self, actions):
        # Process all agent actions
        rewards = {}
        for agent_id, action in actions.items():
            rewards[agent_id] = self._compute_reward(agent_id, action)
        
        return {
            "rewards": rewards,
            "done": self._is_done(),
            "info": {}
        }
    
    def observe(self, agent):
        # Return observation for specific agent
        return {
            "position": self._state.get(f"{agent.id}_pos", [0, 0]),
            "grid": self._get_visible_grid(agent)
        }
    
    def _compute_reward(self, agent_id, action):
        return 0.0
    
    def _is_done(self):
        return False
    
    def _get_visible_grid(self, agent):
        return []

# Create and run multi-agent system
agents = [SimpleAgent(f"agent-{i}") for i in range(5)]
env = GridEnvironment(size=20)
mas = MultiAgent(agents, env)

# Run simulation
observations = mas.reset()
for step in range(100):
    results = mas.step()
    
    print(f"Step {mas.step_count}:")
    for agent_id, reward in results["rewards"].items():
        print(f"  {agent_id}: reward={reward}")
    
    if results["done"]:
        print("Episode complete!")
        break

Competitive Multi-Agent RL

Create a competitive multi-agent reinforcement learning scenario:
from neurenix.agent import Agent, Environment, MultiAgent
from neurenix.nn import Sequential, Linear, ReLU
from neurenix.tensor import Tensor
import numpy as np

class CompetitiveAgent(Agent):
    def __init__(self, name, state_dim, action_dim):
        super().__init__(name)
        self.id = name
        
        # Policy network
        self.policy = Sequential(
            Linear(state_dim, 64),
            ReLU(),
            Linear(64, action_dim)
        )
        
        self.experience_buffer = []
    
    def act(self, observation):
        state = Tensor(observation["state"])
        action_logits = self.policy.forward(state)
        
        # Epsilon-greedy exploration
        if np.random.random() < 0.1:
            return np.random.randint(0, len(action_logits.data))
        else:
            return np.argmax(action_logits.data)
    
    def learn(self, experience):
        self.experience_buffer.append(experience)
        
        # Batch learning every 32 experiences
        if len(self.experience_buffer) >= 32:
            self._update_policy()
            self.experience_buffer = []
    
    def _update_policy(self):
        # Implement policy update (e.g., PPO, DQN)
        pass

class CompetitiveEnvironment(Environment):
    def __init__(self, num_agents):
        super().__init__()
        self.num_agents = num_agents
        self.agent_positions = {}
        self.resources = []
    
    def reset(self):
        self._state = {
            "positions": {},
            "resources": self._generate_resources(),
            "scores": {agent_id: 0 for agent_id in self._agents.keys()}
        }
        return self._state
    
    def step(self, actions):
        rewards = {}
        
        # Process each agent's action
        for agent_id, action in actions.items():
            old_pos = self._state["positions"].get(agent_id, [0, 0])
            new_pos = self._compute_new_position(old_pos, action)
            self._state["positions"][agent_id] = new_pos
            
            # Check for resource collection
            reward = 0
            if new_pos in self._state["resources"]:
                reward = 10
                self._state["resources"].remove(new_pos)
                self._state["scores"][agent_id] += 1
            
            # Penalty for collision with other agents
            for other_id, other_pos in self._state["positions"].items():
                if other_id != agent_id and new_pos == other_pos:
                    reward -= 5
            
            rewards[agent_id] = reward
        
        # Episode ends when all resources collected
        done = len(self._state["resources"]) == 0
        
        return {
            "rewards": rewards,
            "done": done,
            "info": {"scores": self._state["scores"]}
        }
    
    def observe(self, agent):
        pos = self._state["positions"].get(agent.id, [0, 0])
        return {
            "state": self._create_observation_vector(agent.id, pos),
            "position": pos,
            "score": self._state["scores"].get(agent.id, 0)
        }
    
    def _generate_resources(self):
        return [[np.random.randint(0, 10), np.random.randint(0, 10)] for _ in range(20)]
    
    def _compute_new_position(self, pos, action):
        # 4 actions: up, down, left, right
        moves = [[0, 1], [0, -1], [-1, 0], [1, 0]]
        move = moves[action % 4]
        return [pos[0] + move[0], pos[1] + move[1]]
    
    def _create_observation_vector(self, agent_id, pos):
        # Create observation vector (position + nearby resources + other agents)
        obs = np.zeros(10)
        obs[0:2] = pos
        # Add more features...
        return obs

# Create competitive multi-agent system
num_agents = 4
agents = [CompetitiveAgent(f"agent-{i}", state_dim=10, action_dim=4) 
          for i in range(num_agents)]
env = CompetitiveEnvironment(num_agents)

# Register agents with environment
for agent in agents:
    env.register_agent(agent)

mas = MultiAgent(agents, env)

# Training loop
for episode in range(1000):
    observations = mas.reset()
    episode_rewards = {agent.id: 0 for agent in agents}
    
    while True:
        results = mas.step()
        
        # Update episode rewards
        for agent_id, reward in results["rewards"].items():
            episode_rewards[agent_id] += reward
        
        # Have agents learn from experience
        for agent in agents:
            experience = {
                "observation": results["observations"][agent.id],
                "action": results["actions"][agent.id],
                "reward": results["rewards"][agent.id],
                "done": results["done"]
            }
            agent.learn(experience)
        
        if results["done"]:
            break
    
    # Print episode statistics
    if episode % 100 == 0:
        print(f"Episode {episode}:")
        for agent_id, total_reward in episode_rewards.items():
            print(f"  {agent_id}: {total_reward}")

Cooperative Multi-Agent Task

Create agents that cooperate to achieve a shared goal:
class CooperativeAgent(Agent):
    def __init__(self, name, role):
        super().__init__(name)
        self.id = name
        self.role = role  # "explorer", "collector", "builder"
    
    def act(self, observation):
        # Role-specific behavior
        if self.role == "explorer":
            return self._explore_action(observation)
        elif self.role == "collector":
            return self._collect_action(observation)
        else:
            return self._build_action(observation)
    
    def _explore_action(self, obs):
        # Explore unknown areas
        return {"type": "move", "direction": "random"}
    
    def _collect_action(self, obs):
        # Collect discovered resources
        return {"type": "collect", "target": obs.get("nearest_resource")}
    
    def _build_action(self, obs):
        # Build structures with collected resources
        return {"type": "build", "structure": "base"}
    
    def learn(self, experience):
        # Learn to coordinate better
        pass

# Create cooperative team
team = [
    CooperativeAgent("explorer-1", "explorer"),
    CooperativeAgent("explorer-2", "explorer"),
    CooperativeAgent("collector-1", "collector"),
    CooperativeAgent("builder-1", "builder")
]

env = CooperativeEnvironment()
mas = MultiAgent(team, env)

# Run cooperative task
observations = mas.reset()
while True:
    results = mas.step()
    
    # Check if team goal is achieved
    if results["info"].get("goal_achieved"):
        print("Team succeeded!")
        break
    
    if results["done"]:
        print("Team failed.")
        break

Dynamic Agent Management

Add and remove agents dynamically during simulation:
mas = MultiAgent([], env)

# Start with one agent
initial_agent = MyAgent("agent-0")
mas.add_agent(initial_agent)

for step in range(1000):
    results = mas.step()
    
    # Add new agent every 100 steps
    if step % 100 == 0 and len(mas) < 10:
        new_agent = MyAgent(f"agent-{len(mas)}")
        mas.add_agent(new_agent)
        print(f"Added {new_agent.name}. Total agents: {len(mas)}")
    
    # Remove poorly performing agents
    for agent in mas.agents:
        if results["rewards"][agent.id] < -10:
            mas.remove_agent(agent.id)
            print(f"Removed {agent.name}")
    
    if results["done"]:
        break

Best Practices

1. Agents Need an id Attribute

The MultiAgent class uses agent.id to identify agents. Make sure your agents have this attribute:
class MyAgent(Agent):
    def __init__(self, name):
        super().__init__(name)
        self.id = name  # Important for MultiAgent!

2. Environment Must Support Multi-Agent

Your environment should handle actions from multiple agents:
class MultiAgentEnvironment(Environment):
    def step(self, actions):
        # actions is a dict: {agent_id: action}
        rewards = {}
        for agent_id, action in actions.items():
            rewards[agent_id] = self._process_action(agent_id, action)
        
        return {
            "rewards": rewards,  # Dict of rewards per agent
            "done": False,
            "info": {}
        }

3. Register Agents with Environment

If your environment needs to track agents, register them:
for agent in agents:
    env.register_agent(agent)

mas = MultiAgent(agents, env)

4. Handle Episode Termination

Decide when episodes end based on your use case:
def step(self, actions):
    # Episode ends when:
    # - All agents reach goal (cooperative)
    # - One agent wins (competitive)
    # - Maximum steps reached
    # - All resources depleted
    
    done = self._check_termination_condition()
    
    return {
        "rewards": rewards,
        "done": done,
        "info": {}
    }

API Reference

MultiAgent

Source: neurenix/agent/multi_agent.py:10
class MultiAgent:
    def __init__(self, agents: List[Agent], environment: Environment)
    
    @property
    def step_count(self) -> int
    
    def step(self) -> Dict[str, Any]
    def reset(self) -> Dict[str, Any]
    def add_agent(self, agent: Agent) -> None
    def remove_agent(self, agent_id: str) -> Optional[Agent]
    def __len__(self) -> int

See Also

Build docs developers (and LLMs) love