Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/simonw/LLM/llms.txt

Use this file to discover all available pages before exploring further.

The Plugin Tutorial covers the basics of registering a model and implementing execute(). This page goes further, covering the patterns you’ll need when building production-quality plugins for real model APIs — including API key management, async support, structured output, tools, attachments, rich streaming events, and token tracking.

Lazily loading expensive dependencies

If your plugin depends on a large library like PyTorch, don’t import it at the top level of your module. Top-level imports run every time llm starts — including for simple commands like llm --help — and can add multiple seconds of startup latency. Move expensive imports inside the methods that need them:
class MyModel(llm.Model):
    model_id = "my-model"

    def execute(self, prompt, stream, response, conversation):
        # Import happens only when the model is actually used
        import torch
        ...
This change to llm-sentence-transformers shaved 1.8 seconds off llm --help startup time.

Models that accept API keys

Models that call external API providers should subclass llm.KeyModel instead of llm.Model. This wires your model into LLM’s key management system, which reads from llm keys set, environment variables, and the --key CLI flag.
import llm

class HostedModel(llm.KeyModel):
    needs_key = "hosted"       # Required: the name used with llm keys set
    key_env_var = "HOSTED_API_KEY"  # Optional: environment variable fallback
Add a key= parameter to execute() — LLM populates it automatically:
    def execute(self, prompt, stream, response, conversation, key=None):
        # key is the resolved API key
        client = MySDK(api_key=key)
        ...
LLM resolves key by checking, in order: the --key CLI option, the key registry entry for needs_key, and then the key_env_var environment variable.

Async models

Plugins can provide an async model for use with Python’s asyncio. The async class subclasses llm.AsyncModel and implements execute() as an async generator:
from typing import AsyncGenerator
import llm

class MyAsyncModel(llm.AsyncModel):
    model_id = "my-model-id"

    async def execute(
        self, prompt, stream, response, conversation=None
    ) -> AsyncGenerator[str, None]:
        if stream:
            completion = await client.chat.completions.create(
                model=self.model_id,
                messages=messages,
                stream=True,
            )
            async for chunk in completion:
                yield chunk.choices[0].delta.content
        else:
            completion = await client.chat.completions.create(
                model=self.model_name or self.model_id,
                messages=messages,
                stream=False,
            )
            if completion.choices[0].message.content is not None:
                yield completion.choices[0].message.content
For async models that need API keys, subclass llm.AsyncKeyModel:
class MyAsyncModel(llm.AsyncKeyModel):
    ...
    async def execute(
        self, prompt, stream, response, conversation=None, key=None
    ) -> AsyncGenerator[str, None]:
        ...
Register both the sync and async instances in register_models():
@llm.hookimpl
def register_models(register):
    register(
        MyModel(), MyAsyncModel(), aliases=("my-model-aliases",)
    )

Supporting schemas

If your model supports structured JSON output, declare this with supports_schema = True and check for prompt.schema in execute():
class MyModel(llm.KeyModel):
    ...
    supports_schema = True

    def execute(self, prompt, stream, response, conversation, key=None):
        kwargs = {}
        if prompt.schema:
            # prompt.schema is always a dict (JSON schema),
            # even if the user passed a Pydantic model class
            kwargs["response_format"] = {"type": "json_schema", "schema": prompt.schema}
        ...
prompt.schema is always a Python dictionary representing a JSON schema, regardless of what format the caller used.

Supporting tools

Adding tools support involves several coordinated steps:
1
Declare support
2
class MyModel(llm.Model):
    supports_tools = True
3
Pass tools to the API
4
If prompt.tools is populated, convert each llm.Tool object into your provider’s format and include it in the API request.
5
Capture tool calls from the response
6
For each tool call in the provider’s response, call response.add_tool_call(). Pass the provider’s own ID if one is available:
7
response.add_tool_call(
    llm.ToolCall(
        tool_call_id=tool_id,    # omit to let LLM generate a tc_-prefixed id
        name=tool_name,
        arguments=parsed_args,
    )
)
8
Forward tool results
9
If prompt.tool_results is populated, include those llm.ToolResult objects in the messages sent to the API.
10
Handle prompts without text
11
Some prompts carry only tool results, so prompt.prompt may be None. Your code must handle that case.
12
Include tools in conversation history
13
When building the messages array for a multi-turn conversation, include prompt.tools, prompt.tool_results, and tool calls from previous turns.
The llm-gemini tools commit is a real-world example of this pattern.

Attachments for multi-modal models

Models that accept images, audio, or other binary content declare the MIME types they support via an attachment_types class attribute:
class NewModel(llm.Model):
    model_id = "new-model"
    attachment_types = {
        "image/png",
        "image/jpeg",
        "image/webp",
        "image/gif",
    }
MP3 files are detected as audio/mpeg, not audio/mp3.

Working with Attachment objects

Inside execute(), the prompt.attachments list contains Attachment instances. Each has:
  • url — the original URL, if provided
  • path — the resolved file path, if provided
  • type — the declared content type, if provided
  • content — raw bytes, if provided directly
Use these methods to access content safely:
  • attachment.resolve_type() — returns the content type, guessing from bytes if necessary
  • attachment.content_bytes() — returns binary content, fetching from a URL or reading from disk as needed
  • attachment.base64_content() — returns the content as a base64-encoded string
Here is how the OpenAI plugin handles attachments, including the case where prompt.prompt is None:
if not prompt.attachments:
    messages.append({"role": "user", "content": prompt.prompt})
else:
    attachment_message = []
    if prompt.prompt:
        attachment_message.append({"type": "text", "text": prompt.prompt})
    for attachment in prompt.attachments:
        attachment_message.append(_attachment(attachment))
    messages.append({"role": "user", "content": attachment_message})


def _attachment(attachment):
    url = attachment.url
    base64_content = ""
    if not url or attachment.resolve_type().startswith("audio/"):
        base64_content = attachment.base64_content()
        url = f"data:{attachment.resolve_type()};base64,{base64_content}"
    if attachment.resolve_type().startswith("image/"):
        return {"type": "image_url", "image_url": {"url": url}}
    else:
        format_ = "wav" if attachment.resolve_type() == "audio/wav" else "mp3"
        return {
            "type": "input_audio",
            "input_audio": {
                "data": base64_content,
                "format": format_,
            },
        }

Structured messages and streaming events

Rather than yielding plain strings, execute() can yield StreamEvent objects for richer structured output. This enables separate handling of text, reasoning tokens, tool calls, and server-side tool results.
Plain string yields still work unchanged. Each string is internally wrapped as StreamEvent(type="text", chunk=...). You only need StreamEvent when you need to emit non-text content types.

Yielding StreamEvent objects

from llm.parts import StreamEvent

def execute(self, prompt, stream, response, conversation, key=None):
    messages = self.build_messages(prompt, conversation)
    ...
    for chunk in provider_sdk.stream(...):
        if chunk.type == "text":
            yield StreamEvent(type="text", chunk=chunk.text)
        elif chunk.type == "thinking":
            yield StreamEvent(type="reasoning", chunk=chunk.text)
A StreamEvent has these primary fields:
FieldDescription
typeOne of "text", "reasoning", "tool_call_name", "tool_call_args", "tool_result"
chunkThe text fragment for this event
tool_call_idProvider’s ID for the tool call — used to group tool events
provider_metadataOptional dict[str, dict] for opaque provider data
server_executedSet True for server-side tool calls (e.g. Anthropic web search)
tool_nameSet on tool_result events
part_indexOverride event grouping (leave None for automatic)

How events group into Parts

The framework groups consecutive events into Part objects automatically:
Event streamResulting Parts
text × None TextPart
reasoning × N, then text × NReasoningPart, TextPart
text, tool call events, textTextPart, ToolCallPart, TextPart
Parallel tool calls (interleaved by id)one ToolCallPart per distinct tool_call_id

Reasoning tokens

Yield StreamEvent(type="reasoning", ...) for thinking tokens:
yield StreamEvent(type="reasoning", chunk=text_chunk)
Respect prompt.hide_reasoning — when True, do not request reasoning summaries from providers that require an explicit opt-in:
kwargs = {}
if not prompt.hide_reasoning:
    kwargs["reasoning"] = {"summary": "auto"}
If your provider always emits reasoning blocks, keep yielding them — LLM’s display layer will hide them from the user when prompt.hide_reasoning is True.

Tool call events

Emit a pair of events sharing a tool_call_id for each tool call:
yield StreamEvent(
    type="tool_call_name",
    chunk=tool_name,
    tool_call_id=tool_call_id,
)
yield StreamEvent(
    type="tool_call_args",
    chunk=partial_json_fragment,
    tool_call_id=tool_call_id,
)
For client-side tool calls that LLM should execute locally, also call response.add_tool_call():
response.add_tool_call(
    llm.ToolCall(
        tool_call_id=tool_id,
        name=tool_name,
        arguments=parsed_args,
    )
)

Server-side tool calls

For tools the API runs internally (like Anthropic web search), set server_executed=True:
yield StreamEvent(
    type="tool_call_name",
    chunk="web_search",
    tool_call_id=tool_id,
    server_executed=True,
)
yield StreamEvent(
    type="tool_call_args",
    chunk=json.dumps(query_args),
    tool_call_id=tool_id,
    server_executed=True,
)
yield StreamEvent(
    type="tool_result",
    chunk=human_readable_summary,
    tool_call_id=tool_id,
    server_executed=True,
    tool_name="web_search",
    provider_metadata={"myprovider": {"raw_content": full_payload}},
)
Do not call response.add_tool_call() for server-side tool calls.

Opaque provider metadata

Some providers require you to echo back fields on subsequent requests for multi-turn continuity:
yield StreamEvent(
    type="reasoning",
    chunk="",
    provider_metadata={"anthropic": {"signature": sig}},
)
Namespace metadata under your provider name so transcripts mixing providers don’t collide. Use JSON-safe primitives only (string, int, bool, dict, list); use base64 encoding for binary data.

Consuming prompt.messages in build_messages

prompt.messages is the complete input chain for the current turn — whether supplied explicitly via model.prompt(messages=[...]), assembled from keyword arguments, or built by a Conversation. Do not also walk conversation.responses. History is already baked into prompt.messages and iterating the conversation would duplicate it.
from llm.parts import (
    TextPart,
    ReasoningPart,
    ToolCallPart,
    ToolResultPart,
    AttachmentPart,
)

def build_messages(self, prompt, conversation):
    messages = []
    for msg in prompt.messages:
        if msg.role == "system":
            continue  # Handle separately if your API uses a system kwarg
        self._append_message(messages, msg)
    return messages

def _append_message(self, out, msg):
    role = self._provider_role(msg.role)
    parts = []
    for part in msg.parts:
        if isinstance(part, TextPart):
            parts.append({"type": "text", "text": part.text})
        elif isinstance(part, ReasoningPart):
            if part.redacted or not part.text:
                continue
            block = {"type": "thinking", "thinking": part.text}
            sig = (part.provider_metadata or {}).get("anthropic", {}).get("signature")
            if sig:
                block["signature"] = sig
            parts.append(block)
        elif isinstance(part, ToolCallPart):
            parts.append({
                "type": "tool_use",
                "id": part.tool_call_id,
                "name": part.name,
                "input": part.arguments,
            })
        elif isinstance(part, ToolResultPart):
            parts.append({
                "type": "tool_result",
                "tool_use_id": part.tool_call_id,
                "content": part.output,
            })
        elif isinstance(part, AttachmentPart) and part.attachment:
            parts.append(self._attachment_block(part.attachment))
    if out and out[-1]["role"] == role:
        out[-1]["content"].extend(parts)
    else:
        out.append({"role": role, "content": parts})

Restoring opaque metadata on subsequent requests

When resuming a conversation, extract provider-specific metadata from Part.provider_metadata and fold it back into your outgoing request body:
if isinstance(part, ReasoningPart):
    block = {"type": "thinking", "thinking": part.text}
    pm = (part.provider_metadata or {}).get("anthropic", {})
    if "signature" in pm:
        block["signature"] = pm["signature"]
    parts.append(block)

if isinstance(part, ToolCallPart):
    fc_part = {"function_call": {"name": part.name, "args": part.arguments}}
    pm = (part.provider_metadata or {}).get("gemini", {})
    if "thoughtSignature" in pm:
        fc_part["thoughtSignature"] = pm["thoughtSignature"]
    parts.append(fc_part)
If a key is missing (an older transcript from before your plugin added metadata support), fall through gracefully rather than raising an error.

Tracking token usage

Call response.set_usage() at the end of execute() to record token counts. The data is stored in the SQLite log and available through the Python API:
response.set_usage(input=15, output=340, details={"cached": 37})
All three arguments are optional keyword-only:
  • input — integer count of input tokens
  • output — integer count of output tokens
  • details — dict for extra breakdown (e.g. cached tokens, reasoning tokens)

Tracking resolved model names

Some providers expose aliases like model-latest that resolve to different underlying models over time. If the API response includes the actual model used, record it:
response.set_resolved_model(resolved_model_id)
The string is written to resources.resolved_model in the log and shown in llm logs output.

LLM_RAISE_ERRORS

During plugin development, set the LLM_RAISE_ERRORS environment variable to make LLM raise exceptions instead of catching and logging them:
LLM_RAISE_ERRORS=1 python -i -m llm ...
The -i flag drops Python into an interactive shell if an error occurs. From there you can open a post-mortem debugger:
import pdb; pdb.pm()

Build docs developers (and LLMs) love