Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/microsoft/mcp-for-beginners/llms.txt

Use this file to discover all available pages before exploring further.

Overview

This guide covers advanced MCP protocol features that go beyond basic tool and resource handling. Mastering these features helps you build more robust, user-friendly, and production-ready MCP servers.

Progress notifications

Report progress for long-running operations

Request cancellation

Allow clients to cancel in-flight requests

Resource templates

Dynamic resource URIs with parameters

Server lifecycle

Proper initialization and shutdown handling

Logging control

Server-side logging level configuration

Error handling

Consistent, structured error responses

1. Progress notifications

For long-running operations, progress notifications keep users informed without blocking.
Client ──► Server: tools/call (long operation)
Server ──► Client: notification: progress 10%
Server ──► Client: notification: progress 50%
Server ──► Client: notification: progress 90%
Server ──► Client: result (complete)
from mcp.server import Server
from mcp.types import ProgressNotification
import asyncio

app = Server("progress-server")

@app.tool()
async def process_large_file(file_path: str, ctx) -> str:
    """Process a large file with progress updates."""
    file_size = os.path.getsize(file_path)
    processed = 0

    with open(file_path, 'rb') as f:
        while chunk := f.read(8192):
            await process_chunk(chunk)
            processed += len(chunk)

            progress = (processed / file_size) * 100
            await ctx.send_notification(
                ProgressNotification(
                    progressToken=ctx.request_id,
                    progress=progress,
                    total=100,
                    message=f"Processing: {progress:.1f}%"
                )
            )

    return f"Processed {file_size} bytes"


@app.tool()
async def batch_operation(items: list[str], ctx) -> str:
    """Process multiple items with per-item progress."""
    results = []
    total   = len(items)

    for i, item in enumerate(items):
        result = await process_item(item)
        results.append(result)

        await ctx.send_notification(
            ProgressNotification(
                progressToken=ctx.request_id,
                progress=i + 1,
                total=total,
                message=f"Processed {i + 1}/{total}: {item}"
            )
        )

    return f"Completed {total} items"

Client-side progress handling

async def handle_progress(notification):
    params = notification.params
    print(f"Progress: {params.progress}/{params.total}{params.message}")

session.on_notification("notifications/progress", handle_progress)
result = await session.call_tool("process_large_file", {"file_path": "/data/large.csv"})

2. Request cancellation

Allow clients to cancel requests that are no longer needed or have timed out.
from mcp.server import Server
from mcp.types import CancelledError
import asyncio

app = Server("cancellable-server")

@app.tool()
async def long_running_search(query: str, ctx) -> str:
    """Search that can be cancelled mid-execution."""
    results = []

    try:
        for page in range(100):
            if ctx.is_cancelled:
                raise CancelledError("Search cancelled by user")

            page_results = await search_page(query, page)
            results.extend(page_results)
            await asyncio.sleep(0.1)  # Yields control for cancellation check

    except CancelledError:
        return f"Cancelled. Found {len(results)} results before cancellation."

    return f"Found {len(results)} total results"

Client-side cancellation with timeout

async def search_with_timeout(session, query, timeout=30):
    task = asyncio.create_task(
        session.call_tool("long_running_search", {"query": query})
    )

    try:
        result = await asyncio.wait_for(task, timeout=timeout)
        return result
    except asyncio.TimeoutError:
        await session.send_notification({
            "method": "notifications/cancelled",
            "params": {"requestId": task.request_id, "reason": "Timeout"}
        })
        return "Search timed out"

3. Resource templates

Resource templates enable dynamic URI construction with parameters — useful for parameterized databases or APIs.
from mcp.server import Server
from mcp.types import ResourceTemplate

app = Server("template-server")

@app.list_resource_templates()
async def list_templates() -> list[ResourceTemplate]:
    return [
        ResourceTemplate(
            uriTemplate="db://users/{user_id}",
            name="User Profile",
            description="Fetch user profile by ID",
            mimeType="application/json"
        ),
        ResourceTemplate(
            uriTemplate="api://weather/{city}/{date}",
            name="Weather Data",
            description="Historical weather for city and date",
            mimeType="application/json"
        ),
        ResourceTemplate(
            uriTemplate="file://{path}",
            name="File Content",
            description="Read file at given path",
            mimeType="text/plain"
        )
    ]

@app.read_resource()
async def read_resource(uri: str) -> str:
    if uri.startswith("db://users/"):
        user_id = uri.split("/")[-1]
        return await fetch_user(user_id)

    elif uri.startswith("api://weather/"):
        parts      = uri.replace("api://weather/", "").split("/")
        city, date = parts[0], parts[1]
        return await fetch_weather(city, date)

    elif uri.startswith("file://"):
        path = uri.replace("file://", "")
        return await read_file(path)

    raise ValueError(f"Unknown resource URI: {uri}")

4. Server lifecycle events

Proper startup and shutdown handling ensures clean resource management and prevents connection leaks.
from mcp.server import Server
from contextlib import asynccontextmanager

db_connection = None
cache         = None

@asynccontextmanager
async def lifespan(server: Server):
    global db_connection, cache

    # Startup
    print("Server starting...")
    db_connection = await create_database_connection()
    cache         = await create_cache_client()

    yield  # Server runs here

    # Shutdown
    print("Server shutting down...")
    await db_connection.close()
    await cache.close()

app = Server("lifecycle-server", lifespan=lifespan)

@app.tool()
async def query_database(sql: str) -> str:
    result = await db_connection.execute(sql)
    return str(result)

5. Logging control

MCP supports server-side logging levels that clients can control at runtime.
from mcp.server import Server
from mcp.types import LoggingLevel
import logging

app = Server("logging-server")

LEVEL_MAP = {
    LoggingLevel.DEBUG:   logging.DEBUG,
    LoggingLevel.INFO:    logging.INFO,
    LoggingLevel.WARNING: logging.WARNING,
    LoggingLevel.ERROR:   logging.ERROR,
}

logger = logging.getLogger("mcp-server")

@app.set_logging_level()
async def set_logging_level(level: LoggingLevel) -> None:
    python_level = LEVEL_MAP.get(level, logging.INFO)
    logger.setLevel(python_level)
    logger.info(f"Logging level set to {level}")

@app.tool()
async def debug_operation(data: str, ctx) -> str:
    logger.debug(f"Processing data: {data}")

    # Send log to client as well
    await ctx.send_log(
        level="info",
        message=f"Starting operation with input: {data}"
    )

    try:
        result = process(data)
        logger.info(f"Successfully processed: {result}")
        return result
    except Exception as e:
        logger.error(f"Processing failed: {e}")
        raise

6. Error handling patterns

Structured error classes

from mcp.types import McpError, ErrorCode

class ValidationError(McpError):
    def __init__(self, message: str):
        super().__init__(ErrorCode.INVALID_PARAMS, message)

class NotFoundError(McpError):
    def __init__(self, resource: str):
        super().__init__(ErrorCode.INVALID_REQUEST, f"Not found: {resource}")

class PermissionError(McpError):
    def __init__(self, action: str):
        super().__init__(ErrorCode.INVALID_REQUEST, f"Permission denied: {action}")

class InternalError(McpError):
    def __init__(self, message: str):
        super().__init__(ErrorCode.INTERNAL_ERROR, message)


@app.tool()
async def safe_operation(input: str) -> str:
    if not input:
        raise ValidationError("Input cannot be empty")

    if len(input) > 10000:
        raise ValidationError(f"Input too large: {len(input)} chars (max 10000)")

    try:
        if not await check_permission(input):
            raise PermissionError(f"read {input}")

        result = await perform_operation(input)

        if result is None:
            raise NotFoundError(input)

        return result

    except ConnectionError as e:
        raise InternalError(f"Database connection failed: {e}")
    except TimeoutError as e:
        raise InternalError(f"Operation timed out: {e}")

TypeScript error handling

import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";

server.setRequestHandler(CallToolSchema, async (request) => {
    try {
        validateInput(request.params.arguments);
        const result = await performOperation(request.params.arguments);
        return { content: [{ type: "text", text: JSON.stringify(result) }] };

    } catch (error) {
        if (error instanceof McpError) throw error;

        if (error instanceof NotFoundError)
            throw new McpError(ErrorCode.InvalidRequest, error.message);

        console.error("Unexpected error:", error);
        throw new McpError(ErrorCode.InternalError, "An unexpected error occurred");
    }
});

Experimental features (MCP 2025-11-25)

The following features are marked experimental in the MCP specification and may change before stabilization.

Tasks — long-running operations

@app.task()
async def training_task(model_id: str, data_path: str, ctx) -> str:
    await ctx.report_status("running", "Initializing training...")

    for epoch in range(100):
        await train_epoch(model_id, data_path, epoch)
        await ctx.report_status(
            "running",
            f"Training epoch {epoch + 1}/100",
            progress=epoch + 1,
            total=100
        )

    await ctx.report_status("completed", "Training finished")
    return f"Model {model_id} trained successfully"

Tool annotations

@app.tool(
    annotations={
        "destructive":        False,  # Does not modify data
        "idempotent":         True,   # Safe to retry
        "timeout_seconds":    30,     # Expected max duration
        "requires_approval":  False   # No user approval needed
    }
)
async def safe_query(query: str) -> str:
    """A read-only database query tool."""
    return await execute_read_query(query)

Build docs developers (and LLMs) love