Documentation Index
Fetch the complete documentation index at: https://mintlify.com/bytedance/deer-flow/llms.txt
Use this file to discover all available pages before exploring further.
The DeerFlow agent system uses a sophisticated middleware chain that processes every agent invocation through 11 specialized middleware components. Each middleware executes at specific lifecycle hooks (before_agent, after_agent, before_model, after_model, wrap_model_call, wrap_tool_call) to augment agent behavior without modifying core logic.
Execution Order
Middlewares execute in strict order defined in backend/src/agents/lead_agent/agent.py:217-250:
middlewares = [
ThreadDataMiddleware(),
UploadsMiddleware(),
SandboxMiddleware(),
DanglingToolCallMiddleware(),
# Conditionally added:
SummarizationMiddleware(), # if enabled
TodoListMiddleware(), # if is_plan_mode
TitleMiddleware(),
MemoryMiddleware(),
ViewImageMiddleware(), # if model supports vision
SubagentLimitMiddleware(), # if subagent_enabled
ClarificationMiddleware() # must be last
]
Middleware Components
1. ThreadDataMiddleware
Purpose: Creates per-thread isolated directory structure for workspace, uploads, and output files.
Lifecycle: before_agent
Implementation (backend/src/agents/middlewares/thread_data_middleware.py):
class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
def __init__(self, base_dir: str | None = None, lazy_init: bool = True):
# lazy_init=True: Only compute paths, defer directory creation
# lazy_init=False: Eagerly create directories
self._paths = Paths(base_dir) if base_dir else get_paths()
self._lazy_init = lazy_init
def before_agent(self, state, runtime):
thread_id = runtime.context.get("thread_id")
if self._lazy_init:
paths = self._get_thread_paths(thread_id)
else:
paths = self._create_thread_directories(thread_id)
return {
"thread_data": {
"workspace_path": str(paths["workspace_path"]),
"uploads_path": str(paths["uploads_path"]),
"outputs_path": str(paths["outputs_path"])
}
}
Directory Structure Created:
backend/.deer-flow/threads/{thread_id}/user-data/
βββ workspace/ # Agent's working directory
βββ uploads/ # User-uploaded files
βββ outputs/ # Files presented to user via present_files tool
2. UploadsMiddleware
Purpose: Injects uploaded file information into the conversation, tracking new uploads across turns.
Lifecycle: before_agent
Implementation (backend/src/agents/middlewares/uploads_middleware.py:139-220):
class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
def before_agent(self, state, runtime):
thread_id = runtime.context.get("thread_id")
messages = state.get("messages", [])
# Track previously shown files from message history
shown_files = self._extract_files_from_previous_messages(messages[:-1])
# List only newly uploaded files
new_files = self._list_newly_uploaded_files(thread_id, shown_files)
if new_files:
# Prepend file list to last human message
files_message = self._create_files_message(new_files)
updated_message = HumanMessage(
content=f"{files_message}\n\n{original_content}"
)
messages[-1] = updated_message
return {"uploaded_files": new_files, "messages": messages}
Key Features:
- Deduplicates files already shown in previous turns
- Formats file list with size and virtual path:
/mnt/user-data/uploads/{filename}
- Supports filenames with spaces via regex
r"^-\s+(.+?)\s*\("
3. SandboxMiddleware
Purpose: Acquires and manages isolated execution environments for agent tool calls.
Lifecycle: before_agent
Implementation (backend/src/sandbox/middleware.py:18-61):
class SandboxMiddleware(AgentMiddleware[SandboxMiddlewareState]):
def __init__(self, lazy_init: bool = True):
# lazy_init=True: Acquire on first tool call
# lazy_init=False: Acquire in before_agent()
self._lazy_init = lazy_init
def before_agent(self, state, runtime):
if self._lazy_init:
return None # Defer acquisition
if "sandbox" not in state or state["sandbox"] is None:
thread_id = runtime.context["thread_id"]
sandbox_id = self._acquire_sandbox(thread_id)
return {"sandbox": {"sandbox_id": sandbox_id}}
Sandbox Lifecycle:
- Sandbox reused across turns within same thread (not released after each call)
- Cleanup occurs at application shutdown via
SandboxProvider.shutdown()
- Supports local filesystem (
LocalSandboxProvider) and Docker (AioSandboxProvider)
Purpose: Fixes message history gaps caused by interrupted tool calls (e.g., user cancellation).
Lifecycle: wrap_model_call
Implementation (backend/src/agents/middlewares/dangling_tool_call_middleware.py:28-111):
class DanglingToolCallMiddleware(AgentMiddleware[AgentState]):
def wrap_model_call(self, request, handler):
# Scan for AIMessages with tool_calls that lack ToolMessage responses
patched = self._build_patched_messages(request.messages)
if patched:
request = request.override(messages=patched)
return handler(request)
def _build_patched_messages(self, messages):
existing_tool_msg_ids = {msg.tool_call_id for msg in messages
if isinstance(msg, ToolMessage)}
patched = []
for msg in messages:
patched.append(msg)
if getattr(msg, "type", None) == "ai":
for tc in getattr(msg, "tool_calls", []):
if tc["id"] not in existing_tool_msg_ids:
# Inject placeholder ToolMessage
patched.append(ToolMessage(
content="[Tool call was interrupted and did not return a result.]",
tool_call_id=tc["id"],
status="error"
))
return patched
Why wrap_model_call instead of before_model: Ensures patches are inserted immediately after each dangling AIMessage, not appended to the end (which before_model + add_messages reducer would do).
5. SummarizationMiddleware (Optional)
Purpose: Automatic context reduction when approaching token limits.
Lifecycle: before_model, after_model
Configuration (backend/src/config/summarization_config.py):
class SummarizationConfig(BaseModel):
enabled: bool = False
model_name: str | None = None # None = use lightweight model
trigger: ContextSize | list[ContextSize] | None
keep: ContextSize = ContextSize(type="messages", value=20)
trim_tokens_to_summarize: int | None = 4000
summary_prompt: str | None = None
Trigger Types:
{"type": "fraction", "value": 0.8} - 80% of modelβs max input tokens
{"type": "tokens", "value": 4000} - 4000 tokens
{"type": "messages", "value": 50} - 50 messages
Keep Policies: Same types as triggers, defines how much context to preserve after summarization.
Creation (backend/src/agents/lead_agent/agent.py:41-80):
def _create_summarization_middleware():
config = get_summarization_config()
if not config.enabled:
return None
# Convert config to middleware parameters
trigger = [t.to_tuple() for t in config.trigger] if isinstance(config.trigger, list)
else config.trigger.to_tuple()
keep = config.keep.to_tuple()
model = config.model_name or create_chat_model(thinking_enabled=False)
return SummarizationMiddleware(
model=model,
trigger=trigger,
keep=keep,
trim_tokens_to_summarize=config.trim_tokens_to_summarize
)
6. TodoListMiddleware (Optional)
Purpose: Provides write_todos tool for structured task tracking in complex multi-step workflows.
Lifecycle: Tool injection + state management
Activation: Enabled when config.configurable.is_plan_mode = True
Custom Configuration (backend/src/agents/lead_agent/agent.py:83-195):
def _create_todo_list_middleware(is_plan_mode: bool):
if not is_plan_mode:
return None
system_prompt = """
<todo_list_system>
**CRITICAL RULES:**
- Mark todos as completed IMMEDIATELY after finishing each step
- Keep EXACTLY ONE task as `in_progress` at any time
- Update in REAL-TIME - gives users visibility
- DO NOT use for simple tasks (< 3 steps)
</todo_list_system>
"""
tool_description = """Use for complex tasks (3+ steps) only..."""
return TodoListMiddleware(
system_prompt=system_prompt,
tool_description=tool_description
)
Task States:
pending - Not started
in_progress - Currently working (one at a time, or multiple if parallel)
completed - Finished successfully
7. TitleMiddleware
Purpose: Auto-generates thread title after first complete user-assistant exchange.
Lifecycle: after_agent
Implementation (backend/src/agents/middlewares/title_middleware.py:19-94):
class TitleMiddleware(AgentMiddleware[TitleMiddlewareState]):
def after_agent(self, state, runtime):
if self._should_generate_title(state):
title = self._generate_title(state)
return {"title": title}
return None
def _should_generate_title(self, state):
config = get_title_config()
if not config.enabled or state.get("title"):
return False
messages = state.get("messages", [])
user_messages = [m for m in messages if m.type == "human"]
assistant_messages = [m for m in messages if m.type == "ai"]
# Generate after first complete exchange
return len(user_messages) == 1 and len(assistant_messages) >= 1
def _generate_title(self, state):
config = get_title_config()
model = create_chat_model(thinking_enabled=False) # Lightweight model
user_msg = next(m.content for m in messages if m.type == "human")
assistant_msg = next(m.content for m in messages if m.type == "ai")
prompt = config.prompt_template.format(
max_words=config.max_words,
user_msg=user_msg[:500],
assistant_msg=assistant_msg[:500]
)
response = model.invoke(prompt)
title = response.content.strip()[:config.max_chars]
return title
Fallback: If LLM fails, uses first 50 characters of user message.
8. MemoryMiddleware
Purpose: Queues conversation for asynchronous memory extraction and updates.
Lifecycle: after_agent
Implementation (backend/src/agents/middlewares/memory_middleware.py:53-117):
class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
def __init__(self, agent_name: str | None = None):
# agent_name: If provided, uses per-agent memory storage
self._agent_name = agent_name
def after_agent(self, state, runtime):
config = get_memory_config()
if not config.enabled:
return None
thread_id = runtime.context.get("thread_id")
messages = state.get("messages", [])
# Filter to user inputs + final assistant responses (no tool calls)
filtered_messages = _filter_messages_for_memory(messages)
# Queue for debounced background processing
queue = get_memory_queue()
queue.add(
thread_id=thread_id,
messages=filtered_messages,
agent_name=self._agent_name
)
return None # No state changes
Message Filtering (backend/src/agents/middlewares/memory_middleware.py:19-50):
def _filter_messages_for_memory(messages):
filtered = []
for msg in messages:
if msg.type == "human":
filtered.append(msg) # Always keep user messages
elif msg.type == "ai" and not getattr(msg, "tool_calls", None):
filtered.append(msg) # Only keep final AI responses
return filtered
Memory Workflow:
- Middleware queues conversation after agent completes
- Queue debounces (30s default) and batches updates
- Background thread invokes LLM to extract facts and context
- Updates stored atomically in
backend/.deer-flow/memory.json
- Next interaction injects top 15 facts into system prompt
9. ViewImageMiddleware (Optional)
Purpose: Injects base64 image data into conversation when view_image tool completes.
Lifecycle: before_model
Activation: Only added if model_config.supports_vision = true
Implementation (backend/src/agents/middlewares/view_image_middleware.py:19-222):
class ViewImageMiddleware(AgentMiddleware[ViewImageMiddlewareState]):
def before_model(self, state, runtime):
return self._inject_image_message(state)
def _should_inject_image_message(self, state):
messages = state.get("messages", [])
last_assistant_msg = self._get_last_assistant_message(messages)
if not last_assistant_msg:
return False
# Check if it has view_image tool calls
if not self._has_view_image_tool(last_assistant_msg):
return False
# Check if all tools completed
if not self._all_tools_completed(messages, last_assistant_msg):
return False
# Check if we already injected the message
# (prevents duplicate injections)
return not self._already_injected(messages, last_assistant_msg)
def _create_image_details_message(self, state):
viewed_images = state.get("viewed_images", {})
content_blocks = [
{"type": "text", "text": "Here are the images you've viewed:"}
]
for image_path, image_data in viewed_images.items():
content_blocks.append({
"type": "text",
"text": f"\n- **{image_path}** ({image_data['mime_type']})"
})
content_blocks.append({
"type": "image_url",
"image_url": {
"url": f"data:{image_data['mime_type']};base64,{image_data['base64']}"
}
})
return content_blocks
State Management: Uses viewed_images dict in ThreadState with custom reducer:
def merge_viewed_images(existing, new):
if new == {}: # Empty dict clears all viewed images
return {}
return {**existing, **new} # Merge dictionaries
10. SubagentLimitMiddleware (Optional)
Purpose: Enforces maximum concurrent subagent calls by truncating excess task tool calls.
Lifecycle: after_model
Activation: Only added if config.configurable.subagent_enabled = True
Implementation (backend/src/agents/middlewares/subagent_limit_middleware.py:24-76):
class SubagentLimitMiddleware(AgentMiddleware[AgentState]):
def __init__(self, max_concurrent: int = MAX_CONCURRENT_SUBAGENTS):
# max_concurrent clamped to [2, 4]
self.max_concurrent = _clamp_subagent_limit(max_concurrent)
def after_model(self, state, runtime):
return self._truncate_task_calls(state)
def _truncate_task_calls(self, state):
messages = state.get("messages", [])
last_msg = messages[-1]
if getattr(last_msg, "type", None) != "ai":
return None
tool_calls = getattr(last_msg, "tool_calls", None)
task_indices = [i for i, tc in enumerate(tool_calls)
if tc.get("name") == "task"]
if len(task_indices) <= self.max_concurrent:
return None
# Keep only first max_concurrent task calls
indices_to_drop = set(task_indices[self.max_concurrent:])
truncated = [tc for i, tc in enumerate(tool_calls)
if i not in indices_to_drop]
logger.warning(f"Truncated {len(indices_to_drop)} excess task calls")
updated_msg = last_msg.model_copy(update={"tool_calls": truncated})
return {"messages": [updated_msg]}
Why This Works: More reliable than prompt-based limits. Model can generate unlimited task calls, middleware truncates deterministically.
11. ClarificationMiddleware
Purpose: Intercepts ask_clarification tool calls and interrupts execution to present questions to user.
Lifecycle: wrap_tool_call
Position: MUST BE LAST in middleware chain to intercept after all other processing.
Implementation (backend/src/agents/middlewares/clarification_middleware.py:20-174):
class ClarificationMiddleware(AgentMiddleware[ClarificationMiddlewareState]):
def wrap_tool_call(self, request, handler):
if request.tool_call.get("name") != "ask_clarification":
return handler(request) # Pass through
return self._handle_clarification(request)
def _handle_clarification(self, request):
args = request.tool_call.get("args", {})
formatted_message = self._format_clarification_message(args)
tool_message = ToolMessage(
content=formatted_message,
tool_call_id=request.tool_call.get("id"),
name="ask_clarification"
)
# Return Command that interrupts execution
return Command(
update={"messages": [tool_message]},
goto=END # Stop execution, wait for user response
)
def _format_clarification_message(self, args):
question = args.get("question", "")
clarification_type = args.get("clarification_type", "missing_info")
context = args.get("context")
options = args.get("options", [])
type_icons = {
"missing_info": "β",
"ambiguous_requirement": "π€",
"approach_choice": "π",
"risk_confirmation": "β οΈ",
"suggestion": "π‘"
}
icon = type_icons.get(clarification_type, "β")
message_parts = []
if context:
message_parts.append(f"{icon} {context}")
message_parts.append(f"\n{question}")
else:
message_parts.append(f"{icon} {question}")
if options:
message_parts.append("")
for i, option in enumerate(options, 1):
message_parts.append(f" {i}. {option}")
return "\n".join(message_parts)
Key Behavior: Uses Command(goto=END) to interrupt graph execution, forcing wait for user input.
Middleware Ordering Rationale
The strict order ensures correct dependency resolution:
- ThreadDataMiddleware β Creates thread directories first (required by UploadsMiddleware, SandboxMiddleware)
- UploadsMiddleware β Injects file info before sandbox/model sees it
- SandboxMiddleware β Acquires environment before tool execution
- DanglingToolCallMiddleware β Patches message history before model sees it
- SummarizationMiddleware β Reduces context early (before other processing)
- TodoListMiddleware β Enables task tracking (before clarification)
- TitleMiddleware β Generates title after first exchange
- MemoryMiddleware β Queues after title generation (complete turn)
- ViewImageMiddleware β Injects images before model call (if vision supported)
- SubagentLimitMiddleware β Truncates after model generates tool calls
- ClarificationMiddleware β MUST BE LAST to intercept all tool calls
Runtime Configuration
Middlewares can be conditionally enabled via config.configurable:
config = {
"configurable": {
"thinking_enabled": True,
"model_name": "gpt-4o",
"is_plan_mode": False, # Enables TodoListMiddleware
"subagent_enabled": True, # Enables SubagentLimitMiddleware
"max_concurrent_subagents": 3
}
}
agent = make_lead_agent(config)
State Schema Compatibility
All middlewares use state schemas compatible with ThreadState (backend/src/agents/thread_state.py:48-56):
class ThreadState(AgentState):
sandbox: NotRequired[SandboxState | None]
thread_data: NotRequired[ThreadDataState | None]
title: NotRequired[str | None]
artifacts: Annotated[list[str], merge_artifacts]
todos: NotRequired[list | None]
uploaded_files: NotRequired[list[dict] | None]
viewed_images: Annotated[dict[str, ViewedImageData], merge_viewed_images]
Custom Reducers:
merge_artifacts - Deduplicates artifact paths while preserving order
merge_viewed_images - Merges image dicts, empty dict {} clears all
Debugging Middlewares
Each middleware logs key actions:
logger.warning(f"Injecting {count} placeholder ToolMessage(s) for dangling tool calls")
logger.warning(f"Truncated {count} excess task call(s) from model response (limit: {limit})")
print(f"[ViewImageMiddleware] Injecting image details message with images before LLM call")
print(f"[ClarificationMiddleware] Intercepted clarification request")
View logs via:
cd backend
make dev # Watch logs in terminal
See Also