Documentation Index Fetch the complete documentation index at: https://mintlify.com/microsoft/mcp-for-beginners/llms.txt
Use this file to discover all available pages before exploring further.
Objectives
By the end of this lab you will be able to:
Apply performance optimization techniques for MCP servers and databases
Implement comprehensive security hardening measures
Design scalable architecture patterns for production environments
Establish monitoring, maintenance, and operational procedures
Optimize costs while maintaining performance and reliability
Contribute to the MCP community and ecosystem
Prerequisites
Completed all previous labs (00–11)
MCP server deployed to a staging or production environment
Monitoring and alerting configured
Connection pool tuning
import os
from multiprocessing import cpu_count
POOL_CONFIG = {
# Size: scale with available CPU cores
"min_size" : max ( 2 , cpu_count()),
"max_size" : min ( 20 , cpu_count() * 4 ),
# Connection lifecycle
"max_inactive_connection_lifetime" : 300 , # 5 minutes
"command_timeout" : 30 ,
"max_queries" : 50000 , # Rotate connections after this many queries
# PostgreSQL session settings
"server_settings" : {
"application_name" : "mcp-server-prod" ,
"jit" : "off" , # Disable JIT for predictable latency
"work_mem" : "8MB" ,
"shared_preload_libraries" : "pg_stat_statements" ,
"log_statement" : "mod" , # Log only modifications
"log_min_duration_statement" : "1s" , # Log queries over 1 second
}
}
Query optimization with caching
import hashlib
import json
import time
from typing import Any, Optional, Callable
import redis
class SmartCache :
"""Multi-level cache: memory (L1) → Redis (L2)."""
def __init__ ( self , redis_url : Optional[ str ] = None ):
self .memory_cache = {}
self .redis_client = redis.Redis.from_url(redis_url) if redis_url else None
self .max_memory_items = 1000
async def get ( self , key : str ) -> Optional[Any]:
# L1: memory
entry = self .memory_cache.get(key)
if entry and time.time() < entry[ 'expires' ]:
return entry[ 'value' ]
# L2: Redis
if self .redis_client:
try :
import pickle
cached = self .redis_client.get(key)
if cached:
value = pickle.loads(cached)
self ._set_memory(key, value, 60 ) # Promote to L1 for 1 minute
return value
except Exception as e:
logger.warning( f "Redis get failed: { e } " )
return None
async def set ( self , key : str , value : Any, ttl : int = 300 ):
self ._set_memory(key, value, ttl)
if self .redis_client:
try :
import pickle
self .redis_client.setex(key, ttl, pickle.dumps(value))
except Exception as e:
logger.warning( f "Redis set failed: { e } " )
def _set_memory ( self , key : str , value : Any, ttl : int ):
# Simple LRU eviction when cache is full
if len ( self .memory_cache) >= self .max_memory_items:
oldest = min ( self .memory_cache.keys(),
key = lambda k : self .memory_cache[k].get( 'timestamp' , 0 ))
del self .memory_cache[oldest]
self .memory_cache[key] = {
'value' : value,
'timestamp' : time.time(),
'expires' : time.time() + ttl
}
def generate_cache_key ( query : str , user_context : str , params : dict = None ) -> str :
"""Consistent, stable cache key for a query + context combination."""
parts = [query.strip().lower(), user_context,
json.dumps(params, sort_keys = True ) if params else "" ]
return hashlib.sha256( "|" .join(parts).encode()).hexdigest()
Async batch processing with circuit breaker
import asyncio
from asyncio import Semaphore
class AsyncOptimizer :
"""Async patterns for high-throughput MCP operations."""
def __init__ ( self , max_concurrent : int = 10 ):
self .semaphore = Semaphore(max_concurrent)
self .circuit_breaker = CircuitBreaker()
async def batch_process ( self , items , process_func , batch_size : int = 100 ):
"""Process items in bounded concurrent batches."""
async def process_batch ( batch ):
async with self .semaphore:
return await asyncio.gather(
* [process_func(item) for item in batch],
return_exceptions = True
)
results = []
for i in range ( 0 , len (items), batch_size):
batch = items[i:i + batch_size]
results.extend( await process_batch(batch))
if i + batch_size < len (items):
await asyncio.sleep( 0.1 ) # Yield to other coroutines
return results
class CircuitBreaker :
"""Prevent cascading failures by stopping calls to an unhealthy dependency."""
def __init__ ( self , failure_threshold : int = 5 , recovery_timeout : int = 60 ):
self .failure_threshold = failure_threshold
self .recovery_timeout = recovery_timeout
self .failure_count = 0
self .last_failure_time = None
self .state = "CLOSED" # CLOSED → OPEN → HALF_OPEN → CLOSED
async def call ( self , func , * args , ** kwargs ):
if self .state == "OPEN" :
if time.time() - self .last_failure_time > self .recovery_timeout:
self .state = "HALF_OPEN"
else :
raise Exception ( "Circuit breaker is OPEN — dependency unhealthy" )
try :
result = await func( * args, ** kwargs)
if self .state == "HALF_OPEN" :
self .state = "CLOSED"
self .failure_count = 0
return result
except Exception as e:
self .failure_count += 1
self .last_failure_time = time.time()
if self .failure_count >= self .failure_threshold:
self .state = "OPEN"
logger.error( f "Circuit breaker opened after { self .failure_count } failures" )
raise
Security hardening
import re
from typing import List
class InputValidator :
"""SQL injection prevention and input sanitization."""
# Patterns that must never appear in user-provided SQL
FORBIDDEN_PATTERNS = [
r "; \s * ( DROP | DELETE | UPDATE | INSERT | ALTER | CREATE ) \s + " ,
r "-- . * " , # SQL comments
r "/ \* . * \* /" , # Block comments
r "xp_cmdshell" , # SQL Server RCE
r "sp_executesql" ,
r "EXEC \s * \( " ,
]
VALID_TABLES = {
"retail.stores" , "retail.customers" , "retail.products" ,
"retail.sales_transactions" , "retail.sales_transaction_items" ,
"retail.product_categories" , "retail.product_embeddings"
}
@ classmethod
def validate_sql_query ( cls , query : str ) -> bool :
"""Return False if the query contains any forbidden patterns."""
for pattern in cls . FORBIDDEN_PATTERNS :
if re.search(pattern, query, re. IGNORECASE ):
logger.warning( f "Blocked query with pattern: { pattern } " )
return False
if not query.strip().upper().startswith( "SELECT" ):
return False
return True
@ classmethod
def sanitize_table_name ( cls , table_name : str ) -> str :
"""Validate and return the table name, raising ValueError if invalid."""
if not re.match( r " ^ [ a-zA-Z0-9_. ] + $ " , table_name):
raise ValueError ( "Invalid table name format" )
if table_name not in cls . VALID_TABLES :
raise ValueError ( f "Table ' { table_name } ' is not in the allowed list" )
return table_name
Data protection utilities
from cryptography.fernet import Fernet
import hashlib
import os
class DataProtection :
"""Encryption and data masking utilities."""
def __init__ ( self ):
self .cipher_suite = Fernet( self ._get_encryption_key())
def _get_encryption_key ( self ) -> bytes :
"""Retrieve encryption key from Azure Key Vault or environment."""
key = os.getenv( "DEV_ENCRYPTION_KEY" )
if not key:
raise ValueError ( "No encryption key configured" )
return key.encode()
def encrypt ( self , data : str ) -> str :
return self .cipher_suite.encrypt(data.encode()).decode()
def decrypt ( self , encrypted_data : str ) -> str :
return self .cipher_suite.decrypt(encrypted_data.encode()).decode()
@ staticmethod
def mask_sensitive_logs ( log_data : dict ) -> dict :
"""Replace sensitive field values with masked versions for logging."""
sensitive_keys = [ 'password' , 'token' , 'secret' , 'key' , 'authorization' ,
'x-api-key' , 'client_secret' , 'connection_string' ]
masked = log_data.copy()
for key in list (masked.keys()):
if any (s in key.lower() for s in sensitive_keys):
value = str (masked[key])
masked[key] = value[: 2 ] + "*" * max ( 0 , len (value) - 4 ) + value[ - 2 :] if len (value) > 4 else "***"
return masked
Production deployment checklist
Environment validation
class ProductionConfig :
"""Validate all required production settings on startup."""
REQUIRED_SETTINGS = [
"AZURE_CLIENT_ID" , "AZURE_CLIENT_SECRET" , "AZURE_TENANT_ID" ,
"PROJECT_ENDPOINT" , "AZURE_OPENAI_ENDPOINT" ,
"POSTGRES_HOST" , "POSTGRES_PASSWORD" ,
"APPLICATIONINSIGHTS_CONNECTION_STRING"
]
def validate_production_requirements ( self ):
missing = [s for s in self . REQUIRED_SETTINGS if not os.getenv(s)]
if missing:
raise EnvironmentError ( f "Missing required production settings: { missing } " )
def setup_production_logging ( self ):
import logging.handlers
logging.basicConfig(
level = logging. INFO ,
format = ' %(asctime)s - %(name)s - %(levelname)s - %(message)s ' ,
handlers = [
logging.StreamHandler(sys.stdout),
logging.handlers.RotatingFileHandler(
'/var/log/mcp-server.log' ,
maxBytes = 50 * 1024 * 1024 , # 50 MB
backupCount = 5
)
]
)
# Reduce noise from third-party libraries
logging.getLogger( 'azure' ).setLevel(logging. WARNING )
logging.getLogger( 'urllib3' ).setLevel(logging. WARNING )
Recommended indexes for production
-- Core business query patterns
CREATE INDEX CONCURRENTLY idx_orders_store_date
ON retail . orders (store_id, order_date DESC );
CREATE INDEX CONCURRENTLY idx_order_items_product
ON retail . order_items (product_id);
CREATE INDEX CONCURRENTLY idx_customers_store_email
ON retail . customers (store_id, email);
-- Analytics aggregation indexes
CREATE INDEX CONCURRENTLY idx_orders_date_amount
ON retail . orders (order_date, total_amount);
CREATE INDEX CONCURRENTLY idx_products_category_price
ON retail . products (category_id, unit_price);
-- Vector search (IVFFlat for large catalogs, HNSW for smaller)
CREATE INDEX CONCURRENTLY idx_embeddings_vector
ON retail . product_description_embeddings
USING ivfflat (description_embedding vector_cosine_ops)
WITH (lists = 100 );
Cost optimization
class CostOptimizer :
"""Dynamic resource adjustment to control Azure spending."""
async def optimize_database_connections ( self , current_load : float ):
"""Scale the connection pool up or down based on current load."""
if current_load < 0.3 :
target = max ( 2 , int (current_load * 10 ))
elif current_load < 0.7 :
target = max ( 5 , int (current_load * 15 ))
else :
target = min ( 20 , int (current_load * 25 ))
await db_provider.adjust_pool_size(target)
logger.info( f "Connection pool adjusted to { target } for load { current_load :.2f} " )
def estimate_monthly_costs ( self ) -> dict :
"""Return estimated Azure resource costs for capacity planning."""
return {
"container_apps" : "$20–80 (scales to zero when idle)" ,
"postgresql_flexible_server" : "$150–300 (Standard_D4s_v3)" ,
"azure_openai_embeddings" : "$5–20 (text-embedding-3-small @ $0.00002/1K tokens)" ,
"application_insights" : "$5–15 (first 5 GB/month free)" ,
"container_registry" : "$5 (Basic tier)"
}
Comprehensive health monitoring
class OperationalHealth :
"""Scheduled comprehensive health checks with alert integration."""
async def comprehensive_health_check ( self ) -> dict :
health_report = {
"timestamp" : datetime.utcnow().isoformat(),
"overall_status" : "healthy" ,
"components" : {}
}
checks = {
"database" : self .check_database_health,
"ai_service" : self .check_ai_service_health,
"system" : self .check_system_resources,
}
for name, check_func in checks.items():
health_report[ "components" ][name] = await check_func()
failed = [k for k, v in health_report[ "components" ].items()
if v.get( "status" ) != "healthy" ]
if failed:
health_report[ "overall_status" ] = "unhealthy"
health_report[ "failed_components" ] = failed
await alert_manager.evaluate_metrics({
"database_status" : health_report[ "components" ].get( "database" , {}).get( "status" )
})
return health_report
async def check_database_health ( self ) -> dict :
try :
start = time.time()
async with db_provider.get_connection() as conn:
await conn.fetchval( "SELECT 1" )
connection_count = await conn.fetchval(
"SELECT count(*) FROM pg_stat_activity WHERE state = 'active'"
)
return {
"status" : "healthy" ,
"response_time_ms" : (time.time() - start) * 1000 ,
"active_connections" : connection_count
}
except Exception as e:
return { "status" : "unhealthy" , "error" : str (e)}
Final project: production-ready assessment
Before considering this learning path complete, verify your implementation covers:
Multi-tenant schema with store_id on every transactional table
RLS policies enabled and tested for all tables
pgvector HNSW index on product embeddings
Full-text search index on product names and descriptions
Audit log table with appropriate indexes
Automated backup script with Azure Storage upload
FastMCP with all required tools registered
RLS context set per-request from x-rls-user-id header
Query validator blocking dangerous SQL patterns
Connection pool with configurable min/max sizes
Health endpoints (/health, /health/ready, /health/live)
Graceful startup/shutdown with resource cleanup
Azure Entra ID JWT validation with JWKS caching
Role-based authorization with permission inheritance
Input validation on all tool parameters
Sensitive values masked in all log output
Security audit log with monitoring views
Application Insights configured with OpenTelemetry
Structured JSON logging for all key operations
Custom metrics for requests, queries, tools, and errors
Alert rules with cooldown periods for all critical conditions
Azure Monitor dashboard with KQL queries
Multi-stage Dockerfile with non-root user
Docker Compose for local development
Bicep templates for Azure Container Apps and PostgreSQL
GitHub Actions CI/CD with automated tests and staged deployment
Auto-scaling rules based on HTTP concurrency and CPU
Community and next steps
Contributing to MCP
Follow PEP 8 for Python code style
Maintain test coverage above 90%
Use type hints throughout the codebase
Write docstrings for all public functions and classes
Report security vulnerabilities privately before public disclosure
Advanced learning paths
MCP Architecture Patterns — Advanced server architectures for complex use cases
Multi-Model Integration — Combining different AI models within a single MCP server
Enterprise Scale — Large-scale MCP deployments with thousands of concurrent users
Custom Tool Development — Building specialized MCP tools for domain-specific workflows
Key takeaways from the full learning path
RLS + shared schema is the right multi-tenancy model for most retail analytics workloads
FastMCP reduces server boilerplate while preserving full protocol compliance
Azure OpenAI + pgvector provides production-quality semantic search without a separate vector database
Structured logging + distributed tracing dramatically reduces mean time to diagnosis
Multi-stage Docker builds and non-root users are non-negotiable in production
CI/CD with staged deployments enables zero-downtime releases and safe rollbacks
Circuit breakers and connection pool sizing prevent cascading failures under load
Return to Overview Review the full lab structure and learning paths.
Sample Repository Explore the complete working implementation on GitHub.