Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/MilesONerd/neurenix/llms.txt

Use this file to discover all available pages before exploring further.

Environments

The Environment class defines the world in which agents operate. It provides observations to agents, processes their actions, manages state, and determines rewards. Creating custom environments is essential for training and evaluating agents.

Environment Class

The Environment class is defined in neurenix/agent/environment.py and provides the base interface for all environments.

Constructor

from neurenix.agent import Environment

class MyEnvironment(Environment):
    def __init__(self, config):
        super().__init__()
        self.config = config
        # Initialize your environment

env = MyEnvironment(config={"size": 10})
Parameters: None (base class)

Properties

state

Get the current state of the environment (returns a copy).
current_state = env.state
print(current_state)
Returns: Dict[str, Any] - Copy of the current environment state

agents

Get the agents registered with the environment (returns a copy).
registered_agents = env.agents
for agent_id, agent in registered_agents.items():
    print(f"Agent {agent_id}: {agent.name}")
Returns: Dict[str, Any] - Dictionary mapping agent IDs to agents

Core Methods

reset()

Reset the environment to its initial state.
initial_state = env.reset()
print(f"Environment reset to: {initial_state}")
Returns: Dict[str, Any] - Initial state of the environment

step(actions)

Apply actions to the environment and update its state. This method must be implemented by subclasses.
class MyEnvironment(Environment):
    def step(self, actions):
        # Process actions from all agents
        rewards = {}
        for agent_id, action in actions.items():
            rewards[agent_id] = self._compute_reward(agent_id, action)
            self._update_state(agent_id, action)
        
        done = self._check_if_done()
        
        return {
            "rewards": rewards,
            "done": done,
            "info": {"extra_info": "value"}
        }

actions = {"agent-1": action1, "agent-2": action2}
results = env.step(actions)
Parameters:
  • actions (Dict[str, Any]): Dictionary mapping agent IDs to their actions
Returns: Dict[str, Any] - Dictionary containing:
  • rewards (dict): Dictionary mapping agent IDs to their rewards
  • done (bool): Whether the episode is complete
  • info (dict): Additional information
Raises: NotImplementedError if not overridden in subclass

observe(agent)

Get an observation of the environment for a specific agent. This method must be implemented by subclasses.
class MyEnvironment(Environment):
    def observe(self, agent):
        agent_position = self._state.get(f"{agent.id}_position", [0, 0])
        
        return {
            "position": agent_position,
            "visible_objects": self._get_visible_objects(agent),
            "nearby_agents": self._get_nearby_agents(agent)
        }

observation = env.observe(agent)
Parameters:
  • agent (Any): The agent requesting the observation
Returns: Dict[str, Any] - Observation for the agent Raises: NotImplementedError if not overridden in subclass

register_agent(agent)

Register an agent with the environment.
agent = MyAgent("agent-1")
env.register_agent(agent)
Parameters:
  • agent (Any): The agent to register
Returns: None

unregister_agent(agent_id)

Unregister an agent from the environment.
env.unregister_agent("agent-1")
Parameters:
  • agent_id (str): ID of the agent to unregister
Returns: None

Creating Custom Environments

Grid World Environment

Create a simple grid-based environment:
from neurenix.agent import Environment
import numpy as np

class GridWorld(Environment):
    def __init__(self, width=10, height=10, num_obstacles=5):
        super().__init__()
        self.width = width
        self.height = height
        self.num_obstacles = num_obstacles
        self.goal_position = None
        self.obstacles = []
    
    def _get_initial_state(self):
        """Define the initial state of the environment."""
        # Place goal and obstacles
        self.goal_position = [self.width - 1, self.height - 1]
        self.obstacles = [
            [np.random.randint(0, self.width), np.random.randint(0, self.height)]
            for _ in range(self.num_obstacles)
        ]
        
        return {
            "goal": self.goal_position,
            "obstacles": self.obstacles,
            "agent_positions": {},
            "step_count": 0
        }
    
    def step(self, actions):
        """Process agent actions and return results."""
        rewards = {}
        
        for agent_id, action in actions.items():
            # Get current position
            current_pos = self._state["agent_positions"].get(agent_id, [0, 0])
            
            # Compute new position based on action
            # Actions: 0=up, 1=down, 2=left, 3=right
            new_pos = self._compute_new_position(current_pos, action)
            
            # Check if new position is valid
            if self._is_valid_position(new_pos):
                self._state["agent_positions"][agent_id] = new_pos
                
                # Compute reward
                if new_pos == self.goal_position:
                    rewards[agent_id] = 100.0  # Goal reached!
                elif new_pos in self.obstacles:
                    rewards[agent_id] = -10.0  # Hit obstacle
                else:
                    rewards[agent_id] = -1.0   # Step penalty
            else:
                # Invalid move (out of bounds)
                rewards[agent_id] = -5.0
        
        self._state["step_count"] += 1
        
        # Episode ends when any agent reaches goal or max steps
        done = (any(r == 100.0 for r in rewards.values()) or 
                self._state["step_count"] >= 100)
        
        return {
            "rewards": rewards,
            "done": done,
            "info": {"step_count": self._state["step_count"]}
        }
    
    def observe(self, agent):
        """Return observation for a specific agent."""
        agent_pos = self._state["agent_positions"].get(agent.id, [0, 0])
        
        # Create grid observation (local view)
        view_range = 3
        local_grid = self._get_local_grid(agent_pos, view_range)
        
        return {
            "position": agent_pos,
            "goal_direction": self._compute_direction(agent_pos, self.goal_position),
            "local_grid": local_grid,
            "distance_to_goal": self._compute_distance(agent_pos, self.goal_position)
        }
    
    def _compute_new_position(self, pos, action):
        """Compute new position based on action."""
        moves = [[0, -1], [0, 1], [-1, 0], [1, 0]]  # up, down, left, right
        move = moves[action]
        return [pos[0] + move[0], pos[1] + move[1]]
    
    def _is_valid_position(self, pos):
        """Check if position is within bounds."""
        return (0 <= pos[0] < self.width and 0 <= pos[1] < self.height)
    
    def _get_local_grid(self, center, range_size):
        """Get local grid around position."""
        grid = np.zeros((range_size * 2 + 1, range_size * 2 + 1))
        
        for i in range(-range_size, range_size + 1):
            for j in range(-range_size, range_size + 1):
                pos = [center[0] + i, center[1] + j]
                
                if not self._is_valid_position(pos):
                    grid[i + range_size][j + range_size] = -1  # Out of bounds
                elif pos in self.obstacles:
                    grid[i + range_size][j + range_size] = 1   # Obstacle
                elif pos == self.goal_position:
                    grid[i + range_size][j + range_size] = 2   # Goal
        
        return grid.tolist()
    
    def _compute_direction(self, from_pos, to_pos):
        """Compute direction vector from one position to another."""
        return [to_pos[0] - from_pos[0], to_pos[1] - from_pos[1]]
    
    def _compute_distance(self, pos1, pos2):
        """Compute Manhattan distance between positions."""
        return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])

# Use the environment
env = GridWorld(width=10, height=10)
state = env.reset()
print(f"Initial state: {state}")

Continuous Environment

Create an environment with continuous state and action spaces:
from neurenix.agent import Environment
import numpy as np

class ContinuousEnvironment(Environment):
    def __init__(self, state_dim=4, action_dim=2):
        super().__init__()
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.dt = 0.01  # Time step
    
    def _get_initial_state(self):
        """Random initial state."""
        return {
            "positions": {},
            "velocities": {},
            "time": 0.0
        }
    
    def step(self, actions):
        """Continuous dynamics update."""
        rewards = {}
        
        for agent_id, action in actions.items():
            # Get current state
            pos = self._state["positions"].get(agent_id, np.zeros(2))
            vel = self._state["velocities"].get(agent_id, np.zeros(2))
            
            # Apply action (force/acceleration)
            force = np.clip(action, -1.0, 1.0)
            
            # Update velocity and position (simple Euler integration)
            vel = vel + force * self.dt
            pos = pos + vel * self.dt
            
            # Apply damping
            vel = vel * 0.99
            
            # Update state
            self._state["positions"][agent_id] = pos
            self._state["velocities"][agent_id] = vel
            
            # Compute reward (example: distance to target)
            target = np.array([1.0, 1.0])
            distance = np.linalg.norm(pos - target)
            rewards[agent_id] = -distance
            
            # Bonus for reaching target
            if distance < 0.1:
                rewards[agent_id] += 100.0
        
        self._state["time"] += self.dt
        
        # Episode ends after time limit or target reached
        done = (self._state["time"] >= 10.0 or 
                any(r > 50.0 for r in rewards.values()))
        
        return {
            "rewards": rewards,
            "done": done,
            "info": {"time": self._state["time"]}
        }
    
    def observe(self, agent):
        """Return continuous state observation."""
        pos = self._state["positions"].get(agent.id, np.zeros(2))
        vel = self._state["velocities"].get(agent.id, np.zeros(2))
        
        return {
            "state": np.concatenate([pos, vel]),
            "position": pos,
            "velocity": vel
        }

# Use the environment
env = ContinuousEnvironment()
state = env.reset()

Multi-Agent Resource Collection

Create an environment where agents collect resources:
from neurenix.agent import Environment
import numpy as np

class ResourceEnvironment(Environment):
    def __init__(self, grid_size=20, num_resources=50, respawn_rate=0.1):
        super().__init__()
        self.grid_size = grid_size
        self.num_resources = num_resources
        self.respawn_rate = respawn_rate
    
    def _get_initial_state(self):
        """Initialize with random resource positions."""
        resources = [
            [np.random.randint(0, self.grid_size), 
             np.random.randint(0, self.grid_size)]
            for _ in range(self.num_resources)
        ]
        
        return {
            "resources": resources,
            "agent_positions": {},
            "agent_inventories": {},
            "total_collected": 0
        }
    
    def step(self, actions):
        """Process agent actions for resource collection."""
        rewards = {}
        
        for agent_id, action in actions.items():
            # Get current position and inventory
            pos = self._state["agent_positions"].get(agent_id, [0, 0])
            inventory = self._state["agent_inventories"].get(agent_id, 0)
            
            # Parse action
            action_type = action.get("type", "move")
            
            if action_type == "move":
                # Move to new position
                direction = action.get("direction", 0)
                new_pos = self._move(pos, direction)
                self._state["agent_positions"][agent_id] = new_pos
                rewards[agent_id] = -0.1  # Small movement cost
                
            elif action_type == "collect":
                # Try to collect resource at current position
                if pos in self._state["resources"]:
                    self._state["resources"].remove(pos)
                    inventory += 1
                    self._state["agent_inventories"][agent_id] = inventory
                    self._state["total_collected"] += 1
                    rewards[agent_id] = 10.0  # Reward for collection
                else:
                    rewards[agent_id] = -1.0  # Penalty for failed collection
            
            elif action_type == "deposit":
                # Deposit resources at base (position [0,0])
                if pos == [0, 0] and inventory > 0:
                    rewards[agent_id] = inventory * 5.0  # Reward for deposit
                    self._state["agent_inventories"][agent_id] = 0
                else:
                    rewards[agent_id] = -1.0
        
        # Respawn resources randomly
        if np.random.random() < self.respawn_rate:
            new_resource = [
                np.random.randint(0, self.grid_size),
                np.random.randint(0, self.grid_size)
            ]
            self._state["resources"].append(new_resource)
        
        # Episode continues indefinitely (or until target collected)
        done = self._state["total_collected"] >= 100
        
        return {
            "rewards": rewards,
            "done": done,
            "info": {
                "total_collected": self._state["total_collected"],
                "resources_remaining": len(self._state["resources"])
            }
        }
    
    def observe(self, agent):
        """Return observation including nearby resources."""
        pos = self._state["agent_positions"].get(agent.id, [0, 0])
        inventory = self._state["agent_inventories"].get(agent.id, 0)
        
        # Find nearby resources
        view_range = 5
        nearby_resources = [
            r for r in self._state["resources"]
            if abs(r[0] - pos[0]) <= view_range and abs(r[1] - pos[1]) <= view_range
        ]
        
        # Find nearby agents
        nearby_agents = []
        for other_id, other_pos in self._state["agent_positions"].items():
            if other_id != agent.id:
                if abs(other_pos[0] - pos[0]) <= view_range and abs(other_pos[1] - pos[1]) <= view_range:
                    nearby_agents.append(other_pos)
        
        return {
            "position": pos,
            "inventory": inventory,
            "nearby_resources": nearby_resources,
            "nearby_agents": nearby_agents,
            "distance_to_base": abs(pos[0]) + abs(pos[1])
        }
    
    def _move(self, pos, direction):
        """Move in specified direction."""
        moves = [[0, 1], [0, -1], [-1, 0], [1, 0]]  # up, down, left, right
        move = moves[direction % 4]
        new_pos = [pos[0] + move[0], pos[1] + move[1]]
        
        # Clamp to grid bounds
        new_pos[0] = max(0, min(self.grid_size - 1, new_pos[0]))
        new_pos[1] = max(0, min(self.grid_size - 1, new_pos[1]))
        
        return new_pos

Best Practices

1. Override _get_initial_state()

Define your environment’s initial state:
class MyEnvironment(Environment):
    def _get_initial_state(self):
        return {
            "positions": {},
            "resources": self._generate_resources(),
            "time": 0
        }

2. Return Proper Step Results

Always return a dictionary with rewards, done, and info:
def step(self, actions):
    # Process actions...
    
    return {
        "rewards": {agent_id: reward for agent_id in actions.keys()},
        "done": self._is_episode_done(),
        "info": {"extra": "information"}
    }

3. Agent-Specific Observations

Provide observations tailored to each agent’s perspective:
def observe(self, agent):
    # Return only what this agent can see
    return {
        "local_view": self._get_agent_view(agent),
        "agent_state": self._get_agent_state(agent)
    }

4. Handle Agent Registration

Register agents if your environment needs to track them:
for agent in agents:
    env.register_agent(agent)
    # Initialize agent-specific state
    env._state["agent_positions"][agent.id] = [0, 0]

5. Maintain State Immutability

The state property returns a copy for safety:
# Users get a copy
state_copy = env.state
state_copy["key"] = "value"  # Doesn't affect environment

# Modify internal state directly
self._state["key"] = "value"  # This modifies environment

API Reference

Environment

Source: neurenix/agent/environment.py:7
class Environment:
    def __init__(self)
    
    @property
    def state(self) -> Dict[str, Any]
    
    @property
    def agents(self) -> Dict[str, Any]
    
    def reset(self) -> Dict[str, Any]
    def step(self, actions: Dict[str, Any]) -> Dict[str, Any]
    def observe(self, agent: Any) -> Dict[str, Any]
    def register_agent(self, agent: Any) -> None
    def unregister_agent(self, agent_id: str) -> None

See Also

Build docs developers (and LLMs) love