Documentation Index
Fetch the complete documentation index at: https://mintlify.com/kortix-ai/suna/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Kortix provides comprehensive support for the Model Context Protocol (MCP), enabling agents to interact with external tools and services through standardized MCP servers. The MCP integration supports multiple connection types and includes advanced features like schema caching, JIT loading, and connection pooling.
MCP Architecture
Kortix’s MCP system consists of several key components:
MCP Registry
The MCPRegistry (core/agentpress/mcp_registry.py) manages all MCP tool discovery and activation:
class MCPRegistry:
"""Central registry for MCP tools with status tracking and caching."""
def __init__(self):
self._tools: Dict[str, MCPToolInfo] = {}
self._toolkit_mapping: Dict[str, Set[str]] = {}
self._status_index: Dict[MCPToolStatus, Set[str]] = {}
self._schema_cache: Dict[str, Dict[str, Any]] = {}
class MCPToolStatus(Enum):
DISCOVERED = "discovered" # Tool found but not loaded
LOADING = "loading" # Currently being loaded
ACTIVE = "active" # Ready to use
FAILED = "failed" # Loading/execution failed
DISABLED = "disabled" # Manually disabled
The MCPToolWrapper (core/tools/mcp_tool_wrapper.py) provides the interface for using MCP tools:
@tool_metadata(
display_name="MCP Tool Wrapper",
description="Internal wrapper for MCP external tool integration",
visible=False
)
class MCPToolWrapper(Tool):
def __init__(self, mcp_configs: Optional[List[Dict[str, Any]]] = None,
use_cache: bool = True, account_id: str = None):
self.mcp_manager = mcp_service
self.mcp_configs = mcp_configs or []
self.connection_manager = MCPConnectionManager()
self.custom_handler = CustomMCPHandler(self.connection_manager, account_id)
Supported MCP Types
Kortix supports four types of MCP server connections:
1. SSE (Server-Sent Events)
For HTTP-based streaming connections:
{
"name": "my_sse_server",
"isCustom": True,
"customType": "sse",
"config": {
"url": "https://mcp.example.com/sse",
"headers": { # Optional
"Authorization": "Bearer token"
}
},
"enabledTools": ["tool1", "tool2"] # Optional filter
}
Implementation (custom_mcp_handler.py):
async def _initialize_sse_mcp(self, server_name: str,
server_config: Dict[str, Any],
enabled_tools: List[str]):
from mcp.client.sse import sse_client
from mcp import ClientSession
url = server_config.get('url')
headers = server_config.get('headers', {})
async with sse_client(url, headers=headers) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
tools_result = await session.list_tools()
tools = tools_result.tools
self._register_custom_tools(tools, server_name, enabled_tools, 'sse', server_config)
2. HTTP (Streamable HTTP)
For standard HTTP-based MCP servers:
{
"name": "my_http_server",
"isCustom": True,
"customType": "http",
"config": {
"url": "https://mcp.example.com"
},
"enabledTools": [] # Empty = all tools
}
Implementation:
async def _initialize_http_mcp(self, server_name: str,
server_config: Dict[str, Any],
enabled_tools: List[str]):
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession
url = server_config.get('url')
async with streamablehttp_client(url) as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
tools_result = await session.list_tools()
tools = tools_result.tools
self._register_custom_tools(tools, server_name, enabled_tools, 'http', server_config)
3. JSON/stdio (Process-based)
For local MCP servers running as processes:
{
"name": "my_stdio_server",
"isCustom": True,
"customType": "json",
"config": {
"command": "node",
"args": ["/path/to/server.js"],
"env": { # Optional environment variables
"API_KEY": "xxx"
}
}
}
Implementation:
async def _initialize_json_mcp(self, server_name: str,
server_config: Dict[str, Any],
enabled_tools: List[str]):
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
command = server_config.get('command')
args = server_config.get('args', [])
env = server_config.get('env', {})
server_params = StdioServerParameters(
command=command,
args=args,
env=env
)
async with stdio_client(server_params) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
tools_result = await session.list_tools()
tools = tools_result.tools
self._register_custom_tools(tools, server_name, enabled_tools, 'json', server_config)
4. Composio MCP
For Composio-managed MCP servers (see Composio Integration):
{
"name": "gmail_integration",
"isCustom": True,
"customType": "composio",
"config": {
"profile_id": "prof_xxx"
}
}
Schema Caching
Kortix implements Redis-based schema caching for fast MCP tool loading:
Cache Configuration
class MCPRegistry:
SCHEMA_CACHE_TTL_HOURS = 24
SCHEMA_CACHE_KEY_PREFIX = "mcp_schema:"
Cache Flow
async def _load_schemas_from_mcp(self, tool_names: List[str],
account_id: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
# 1. Check cache first
cached_schemas = await self._get_cached_toolkit_schemas(toolkit_slug)
if cached_schemas:
logger.info(f"⚡ Cache HIT for {toolkit_slug}")
return cached_schemas
# 2. Load from MCP server
toolkit_schemas = await self._load_custom_mcp_schemas(custom_type, config)
# 3. Cache the result
await self._cache_toolkit_schemas(toolkit_slug, toolkit_schemas)
return toolkit_schemas
Benefits
- Instant startup: Cached schemas load in milliseconds
- Reduced API calls: No need to reconnect to MCP servers
- Improved reliability: Works even if MCP server is temporarily down
- Lower latency: Cached schemas reduce tool activation time
MCP tools use JIT loading to minimize startup time:
# Tools are discovered but not loaded initially
async def get_discovery_info(self, filter_pattern: Optional[str] = None,
load_schemas: bool = True) -> Dict[str, Any]:
available_tools = {}
tools_needing_schemas = []
# Return cached schemas immediately
for tool_name, tool_info in self._tools.items():
if tool_info.schema:
available_tools[tool_name] = tool_info.schema
else:
tools_needing_schemas.append(tool_name)
# Optionally load missing schemas
if load_schemas and tools_needing_schemas:
schemas_loaded = await self._load_schemas_from_mcp(tools_needing_schemas)
available_tools.update(schemas_loaded)
return {"available_tools": available_tools, "total_count": len(available_tools)}
Auto-Activation
async def execute_tool(self, tool_name: str, args: Dict[str, Any],
context: MCPExecutionContext) -> ToolResult:
# Check if tool is already active
if not self.is_tool_active(tool_name):
logger.info(f"🔄 Auto-activating {tool_name}")
success = await self._auto_activate_tool(tool_name, context)
if not success:
return self._fail_response(f"Failed to activate {tool_name}")
# Execute the tool
tool_info = self._tools[tool_name]
method = getattr(tool_info.instance, tool_name)
result = await method(**args)
# Update statistics
tool_info.call_count += 1
tool_info.last_used_ms = time.time() * 1000
return result
Initialize MCP Wrapper
from core.tools.mcp_tool_wrapper import MCPToolWrapper
mcp_configs = [
{
"name": "github_mcp",
"isCustom": True,
"customType": "http",
"config": {"url": "https://mcp.github.com"}
},
{
"name": "notion_mcp",
"isCustom": True,
"customType": "sse",
"config": {
"url": "https://mcp.notion.com/sse",
"headers": {"Authorization": f"Bearer {notion_token}"}
}
}
]
mcp_wrapper = MCPToolWrapper(
mcp_configs=mcp_configs,
use_cache=True,
account_id=user_account_id
)
await mcp_wrapper.initialize_and_register_tools()
Dynamic Method Access
# MCP tools become methods on the wrapper
result = await mcp_wrapper.github_search_repositories(
query="machine learning",
limit=10
)
# Or use execute method
result = await mcp_wrapper._execute_mcp_tool(
tool_name="github_search_repositories",
arguments={"query": "machine learning", "limit": 10}
)
from core.agentpress.mcp_registry import get_mcp_registry, init_mcp_registry_from_loader
# Initialize registry from loader
init_mcp_registry_from_loader(mcp_loader)
# Get registry instance
registry = get_mcp_registry()
# Check tool availability
if registry.is_tool_available('github_create_issue'):
tool_info = registry.get_tool_info('github_create_issue')
print(f"Status: {tool_info.status}")
print(f"Toolkit: {tool_info.toolkit_slug}")
Connection Management
The MCPConnectionManager handles connection pooling:
class MCPConnectionManager:
def __init__(self):
self.connections: Dict[str, Any] = {}
self.sessions: Dict[str, Any] = {}
async def connect_sse_server(self, server_name: str,
config: Dict[str, Any]) -> Dict[str, Any]:
# Establish SSE connection
# Store in connection pool
# Return server info with tools
async def connect_http_server(self, server_name: str,
config: Dict[str, Any]) -> Dict[str, Any]:
# Establish HTTP connection
# Store in connection pool
# Return server info with tools
Error Handling
async def _initialize_single_custom_mcp(self, config: Dict[str, Any]):
try:
config_name = config.get('name', 'Unknown')
logger.debug(f"Initializing custom MCP: {config_name}")
await self.custom_handler._initialize_single_custom_mcp(config)
return {'tools': custom_tools, 'type': 'custom', 'success': True}
except Exception as e:
logger.error(f"✗ Failed to initialize custom MCP {config_name}: {e}")
# Return error info instead of raising - allows execution to continue
return {
'tools': {},
'type': 'custom',
'success': False,
'error': str(e),
'config_name': config_name
}
Execution Errors
async def execute_tool(self, tool_name: str, args: Dict[str, Any],
context: MCPExecutionContext) -> ToolResult:
try:
# Execute tool
result = await method(**args)
return result
except Exception as e:
# Track error
tool_info = self._tools.get(tool_name)
if tool_info:
tool_info.last_error = str(e)
tool_info.error_count += 1
self._update_tool_status(tool_name, MCPToolStatus.FAILED)
logger.error(f"❌ {tool_name} failed: {e}")
return self._fail_response(f"MCP tool execution error: {str(e)}")
Parallel Initialization
async def _initialize_servers(self):
initialization_tasks = []
# Queue all initialization tasks
for config in self.mcp_configs:
task = self._initialize_single_custom_mcp(config)
initialization_tasks.append(('custom', config, task))
# Execute in parallel
results = await asyncio.gather(*tasks, return_exceptions=True)
logger.debug(f"⚡ Initialized {successful} servers in {elapsed_time:.2f}s")
Schema Pre-warming
async def prewarm_schemas(self, account_id: Optional[str] = None) -> int:
"""Pre-load schemas from cache at startup."""
toolkits = self.get_available_toolkits()
warmed_count = 0
for toolkit_slug in toolkits:
cached = await self._get_cached_toolkit_schemas(toolkit_slug)
if cached:
for tool_name, schema in cached.items():
if tool_name in self._tools:
self._tools[tool_name].schema = schema
warmed_count += 1
logger.info(f"✅ Pre-warmed {warmed_count} schemas from Redis")
return warmed_count
Monitoring and Statistics
# Get registry statistics
stats = registry.get_registry_stats()
print(f"Total tools: {stats['total_tools']}")
print(f"Active tools: {stats['active_tools']}")
print(f"Failed tools: {stats['failed_tools']}")
print(f"Toolkits: {stats['toolkits']}")
print(f"Status breakdown: {stats['status_breakdown']}")
# Get tool-specific info
tool_info = registry.get_tool_info('github_create_issue')
print(f"Call count: {tool_info.call_count}")
print(f"Last used: {tool_info.last_used_ms}")
print(f"Error count: {tool_info.error_count}")
Best Practices
- Enable Caching: Always use
use_cache=True for production
- Filter Tools: Use
enabledTools to load only needed tools
- Handle Failures: MCP servers may be unavailable - design for resilience
- Monitor Performance: Track activation times and cache hit rates
- Batch Operations: Initialize multiple MCP servers in parallel
- Error Recovery: Implement retry logic for transient failures
Next Steps