Documentation Index
Fetch the complete documentation index at: https://mintlify.com/bytedance/deer-flow/llms.txt
Use this file to discover all available pages before exploring further.
DeerFlow’s agent system uses a custom ThreadState schema that extends LangChain’s AgentState with domain-specific fields for sandbox management, file tracking, and user interactions. The state is managed by LangGraph’s checkpointing system and enhanced with custom reducers for intelligent state merging.
ThreadState Schema
Defined in backend/src/agents/thread_state.py:
from typing import Annotated, NotRequired, TypedDict
from langchain.agents import AgentState
class SandboxState(TypedDict):
sandbox_id: NotRequired[str | None]
class ThreadDataState(TypedDict):
workspace_path: NotRequired[str | None]
uploads_path: NotRequired[str | None]
outputs_path: NotRequired[str | None]
class ViewedImageData(TypedDict):
base64: str
mime_type: str
class ThreadState(AgentState):
sandbox: NotRequired[SandboxState | None]
thread_data: NotRequired[ThreadDataState | None]
title: NotRequired[str | None]
artifacts: Annotated[list[str], merge_artifacts]
todos: NotRequired[list | None]
uploaded_files: NotRequired[list[dict] | None]
viewed_images: Annotated[dict[str, ViewedImageData], merge_viewed_images]
State Fields
sandbox
Type: SandboxState | None
Purpose: Tracks the active sandbox environment for isolated tool execution.
Structure:
{
"sandbox_id": "local" # or Docker container ID
}
Lifecycle:
- Set by
SandboxMiddleware in before_agent hook
- Persisted across turns within same thread (sandbox not released)
- Used by sandbox tools (
bash, read_file, write_file, str_replace, ls)
Example:
state["sandbox"] = {"sandbox_id": "local"}
# Tools check sandbox_id to determine execution mode
from src.sandbox import is_local_sandbox
if is_local_sandbox(state["sandbox"]["sandbox_id"]):
# Local filesystem execution with path translation
real_path = replace_virtual_path(virtual_path, thread_id)
else:
# Docker container execution
sandbox = provider.get(state["sandbox"]["sandbox_id"])
result = sandbox.execute_command(command)
thread_data
Type: ThreadDataState | None
Purpose: Provides path mappings between virtual (agent-visible) and physical (host) paths.
Structure:
{
"workspace_path": "/path/to/backend/.deer-flow/threads/{thread_id}/user-data/workspace",
"uploads_path": "/path/to/backend/.deer-flow/threads/{thread_id}/user-data/uploads",
"outputs_path": "/path/to/backend/.deer-flow/threads/{thread_id}/user-data/outputs"
}
Virtual Path Mappings:
- Agent sees:
/mnt/user-data/workspace, /mnt/user-data/uploads, /mnt/user-data/outputs
- Physical:
backend/.deer-flow/threads/{thread_id}/user-data/{workspace,uploads,outputs}
Lifecycle:
- Set by
ThreadDataMiddleware in before_agent hook
- With
lazy_init=True (default): Paths computed but directories created on-demand
- With
lazy_init=False: Directories eagerly created in middleware
title
Type: str | None
Purpose: Human-readable thread title for UI display.
Lifecycle:
- Set by
TitleMiddleware after first complete user-assistant exchange
- Generated via lightweight LLM based on first user message and assistant response
- Persisted by LangGraph checkpointer
Generation (backend/src/agents/middlewares/title_middleware.py:46-81):
def _generate_title(self, state):
config = get_title_config() # max_words, max_chars, prompt_template
model = create_chat_model(thinking_enabled=False)
user_msg = next(m.content for m in messages if m.type == "human")
assistant_msg = next(m.content for m in messages if m.type == "ai")
prompt = config.prompt_template.format(
max_words=config.max_words,
user_msg=user_msg[:500],
assistant_msg=assistant_msg[:500]
)
try:
response = model.invoke(prompt)
title = response.content.strip()[: config.max_chars]
return title
except Exception:
# Fallback: first 50 chars of user message
return user_msg[:50].rstrip() + "..."
Configuration (config.yaml):
title:
enabled: true
max_words: 8
max_chars: 80
prompt_template: "Generate a concise title (max {max_words} words) for this conversation: {user_msg}\n{assistant_msg}"
artifacts
Type: Annotated[list[str], merge_artifacts]
Purpose: Tracks files presented to user via present_files tool.
Custom Reducer (backend/src/agents/thread_state.py:21-28):
def merge_artifacts(existing: list[str] | None, new: list[str] | None) -> list[str]:
"""Reducer for artifacts list - merges and deduplicates artifacts."""
if existing is None:
return new or []
if new is None:
return existing
# Use dict.fromkeys to deduplicate while preserving order
return list(dict.fromkeys(existing + new))
Behavior:
- Maintains insertion order (first occurrence preserved)
- Automatically deduplicates paths
- Survives across turns (cumulative)
Usage in Tools:
# present_files tool (backend/src/tools/builtins/present_files.py)
def present_files(file_paths: list[str], runtime: ToolRuntimeContext):
state = runtime.state
thread_data = state.get("thread_data", {})
outputs_dir = thread_data.get("outputs_path")
# Validate files are in outputs directory
for path in file_paths:
if not path.startswith("/mnt/user-data/outputs"):
raise ValueError(f"Files must be in /mnt/user-data/outputs: {path}")
# Update state (triggers merge_artifacts reducer)
return {"artifacts": file_paths}
todos
Type: list | None
Purpose: Stores task list when TodoListMiddleware is enabled (is_plan_mode=True).
Structure:
[
{
"id": "todo-1",
"title": "Read configuration files",
"status": "completed"
},
{
"id": "todo-2",
"title": "Implement authentication logic",
"status": "in_progress"
},
{
"id": "todo-3",
"title": "Write unit tests",
"status": "pending"
}
]
Task States:
pending - Not yet started
in_progress - Currently working (one at a time, or multiple if parallel)
completed - Finished successfully
Managed By: TodoListMiddleware (LangChain built-in) with custom prompts
Tool: write_todos (injected by middleware)
uploaded_files
Type: list[dict] | None
Purpose: Tracks newly uploaded files for current turn.
Structure:
[
{
"filename": "report.pdf",
"size": 1048576, # bytes
"path": "/mnt/user-data/uploads/report.pdf",
"extension": ".pdf"
}
]
Lifecycle:
- Set by
UploadsMiddleware in before_agent hook
- Only includes files NOT already shown in previous messages (deduplication)
- Cleared on next turn (not cumulative)
Deduplication Logic (backend/src/agents/middlewares/uploads_middleware.py:110-136):
def _extract_files_from_message(self, content: str) -> set[str]:
"""Extract filenames from <uploaded_files> tag in message history."""
match = re.search(r"<uploaded_files>([\s\S]*?)</uploaded_files>", content)
if not match:
return set()
files_content = match.group(1)
filenames = set()
for line in files_content.split("\n"):
# Match: - filename.ext (size)
file_match = re.match(r"^-\s+(.+?)\s*\(", line.strip())
if file_match:
filenames.add(file_match.group(1).strip())
return filenames
def before_agent(self, state, runtime):
# Scan all previous messages (except last) for shown files
shown_files = set()
for msg in state["messages"][:-1]:
if isinstance(msg, HumanMessage):
shown_files.update(self._extract_files_from_message(msg.content))
# List only newly uploaded files
new_files = self._list_newly_uploaded_files(thread_id, shown_files)
return {"uploaded_files": new_files, "messages": updated_messages}
viewed_images
Type: Annotated[dict[str, ViewedImageData], merge_viewed_images]
Purpose: Tracks images loaded via view_image tool for vision model analysis.
Structure:
{
"/mnt/user-data/uploads/diagram.png": {
"base64": "iVBORw0KGgoAAAANSUhEUgA...",
"mime_type": "image/png"
},
"/mnt/user-data/workspace/screenshot.jpg": {
"base64": "/9j/4AAQSkZJRgABAQEAY...",
"mime_type": "image/jpeg"
}
}
Custom Reducer (backend/src/agents/thread_state.py:31-45):
def merge_viewed_images(
existing: dict[str, ViewedImageData] | None,
new: dict[str, ViewedImageData] | None
) -> dict[str, ViewedImageData]:
"""Reducer for viewed_images dict - merges image dictionaries.
Special case: If new is an empty dict {}, it clears the existing images.
This allows middlewares to clear the viewed_images state after processing.
"""
if existing is None:
return new or {}
if new is None:
return existing
# Special case: empty dict means clear all viewed images
if len(new) == 0:
return {}
# Merge dictionaries, new values override existing ones for same keys
return {**existing, **new}
Behavior:
- Normal updates: Merge dictionaries (new keys added, existing keys updated)
- Empty dict
{}: Clears all viewed images (reset)
- Used by
ViewImageMiddleware to inject images before LLM call
Lifecycle:
- Agent calls
view_image tool → Tool returns base64 data in ToolMessage
- Tool also updates state:
{"viewed_images": {path: {base64, mime_type}}}
ViewImageMiddleware detects completed view_image tool calls in before_model
- Middleware injects HumanMessage with multimodal content (text + images)
- LLM analyzes images automatically
- Middleware clears state:
{"viewed_images": {}} after processing
State Management Patterns
1. Middleware State Updates
Middlewares return state updates as dictionaries:
class MyMiddleware(AgentMiddleware[MyState]):
def before_agent(self, state, runtime):
return {
"field1": "value1",
"field2": ["item1", "item2"],
"messages": [HumanMessage(content="injected message")]
}
Merge Behavior:
- Fields without custom reducers: Replace existing value
- Fields with custom reducers: Call reducer function
messages: Uses LangChain’s add_messages reducer (append to list)
Tools can update state by returning dictionaries:
def my_tool(arg: str, runtime: ToolRuntimeContext):
# Access current state
state = runtime.state
current_artifacts = state.get("artifacts", [])
# Return state update
return {
"artifacts": current_artifacts + ["/new/file.txt"]
}
3. State Persistence
State persisted via LangGraph checkpointer:
from langgraph.checkpoint.postgres import PostgresCheckpointer
# Configure in langgraph.json or programmatically
checkpointer = PostgresCheckpointer(
connection_string=os.getenv("POSTGRES_URI")
)
agent = create_agent(
model=model,
tools=tools,
middleware=middlewares,
state_schema=ThreadState,
checkpointer=checkpointer # Enables state persistence
)
Persisted Fields:
- All
ThreadState fields
- Full message history
- Checkpoints created after each agent step
Retrieval:
# Get latest state for thread
state = agent.get_state(thread_id)
# Get state at specific checkpoint
state = agent.get_state(checkpoint_id)
# List all checkpoints
checkpoints = agent.list_checkpoints(thread_id)
4. Thread Isolation
Each thread maintains independent state:
# Thread 1
agent.invoke(
{"messages": [HumanMessage(content="Hello")]},
config={"configurable": {"thread_id": "thread-1"}}
)
# Thread 2 (completely isolated)
agent.invoke(
{"messages": [HumanMessage(content="Hello")]},
config={"configurable": {"thread_id": "thread-2"}}
)
Physical Isolation:
- Separate directories:
backend/.deer-flow/threads/{thread_id}/
- Separate sandboxes (if using Docker provider)
- Separate checkpoint history
Custom Reducer Implementation
When to Use Custom Reducers
- Deduplication - Remove duplicates while merging (like
merge_artifacts)
- Merging Dicts - Intelligently merge nested structures (like
merge_viewed_images)
- Reset Semantics - Support clearing values (empty dict resets
viewed_images)
- Aggregation - Accumulate values with custom logic
Creating Custom Reducers
from typing import Annotated
def merge_my_field(existing: list[str] | None, new: list[str] | None) -> list[str]:
"""Custom reducer for my_field."""
if existing is None:
return new or []
if new is None:
return existing
# Custom merge logic here
combined = existing + new
return sorted(set(combined)) # Deduplicate and sort
class MyState(AgentState):
my_field: Annotated[list[str], merge_my_field]
Reducer Contract:
- Takes two arguments:
existing (current state) and new (update)
- Both arguments can be
None
- Returns merged value of same type
- Pure function (no side effects)
Testing Reducers
import pytest
from src.agents.thread_state import merge_artifacts, merge_viewed_images
def test_merge_artifacts_deduplication():
existing = ["file1.txt", "file2.txt"]
new = ["file2.txt", "file3.txt"]
result = merge_artifacts(existing, new)
assert result == ["file1.txt", "file2.txt", "file3.txt"]
def test_merge_viewed_images_clear():
existing = {"img1.png": {"base64": "data", "mime_type": "image/png"}}
new = {} # Empty dict should clear
result = merge_viewed_images(existing, new)
assert result == {}
def test_merge_viewed_images_update():
existing = {"img1.png": {"base64": "old", "mime_type": "image/png"}}
new = {"img1.png": {"base64": "new", "mime_type": "image/png"}}
result = merge_viewed_images(existing, new)
assert result["img1.png"]["base64"] == "new" # New value wins
State Debugging
Inspecting Current State
# In middleware
class DebugMiddleware(AgentMiddleware[ThreadState]):
def before_agent(self, state, runtime):
import json
print("Current state:", json.dumps({
"sandbox": state.get("sandbox"),
"thread_data": state.get("thread_data"),
"artifacts": state.get("artifacts"),
"viewed_images": list(state.get("viewed_images", {}).keys())
}, indent=2))
return None
State Size Monitoring
import sys
class StateSizeMonitor(AgentMiddleware[ThreadState]):
def after_agent(self, state, runtime):
total_size = sum([
sys.getsizeof(str(state.get("messages", []))),
sys.getsizeof(str(state.get("viewed_images", {}))),
sys.getsizeof(str(state.get("artifacts", [])))
])
print(f"Total state size: {total_size / 1024:.2f} KB")
if total_size > 1_000_000: # 1 MB
print("WARNING: State size exceeds 1 MB, consider summarization")
return None
Memory Usage
messages: Grows unbounded without summarization (use SummarizationMiddleware)
viewed_images: Store base64 data (can be large, clear after processing)
artifacts: Small (just file paths)
Database Size
- Each checkpoint persists full state to database
- With PostgresCheckpointer: One row per checkpoint
- Recommend periodic cleanup of old threads
State Transfer
- State serialized/deserialized on every agent step
- Keep state schema simple (avoid deeply nested structures)
- Use
NotRequired for optional fields (reduces serialization overhead)
See Also