Documentation Index
Fetch the complete documentation index at: https://mintlify.com/langchain-ai/langgraph/llms.txt
Use this file to discover all available pages before exploring further.
Interrupts enable human-in-the-loop workflows by pausing graph execution at specific points for review, approval, or input.
Why Use Interrupts
Interrupts are essential for:
- Human approval: Review agent actions before execution
- Input collection: Gather additional information from users
- Quality control: Verify outputs before continuing
- Safety: Prevent unwanted actions in production
- Debugging: Inspect state at specific points
Basic Interrupts
Interrupt Before Nodes
Pause before executing specific nodes:
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import InMemorySaver
memory = InMemorySaver()
app = graph.compile(
checkpointer=memory,
interrupt_before=["tools"], # Pause before tool execution
)
# Graph pauses before 'tools' node
config = {"configurable": {"thread_id": "thread-1"}}
result = app.invoke({"messages": [...]}, config)
# Check state
state = app.get_state(config)
print(state.next) # ['tools']
# Resume execution
app.invoke(None, config)
Interrupt After Nodes
Pause after executing specific nodes:
app = graph.compile(
checkpointer=memory,
interrupt_after=["agent"], # Pause after agent runs
)
# Agent runs, then pauses
result = app.invoke({"messages": [...]}, config)
# Review agent output
state = app.get_state(config)
print(state.values["messages"])
# Continue
app.invoke(None, config)
Interrupt on All Nodes
Pause at every node for debugging:
app = graph.compile(
checkpointer=memory,
interrupt_before="*", # Pause before every node
)
Reviewing State
Inspect state when interrupted:
state = app.get_state(config)
print(f"Next nodes: {state.next}")
print(f"Current values: {state.values}")
print(f"Metadata: {state.metadata}")
state = app.get_state(config)
if state.next:
print(f"Paused before: {state.next}")
else:
print("Execution complete")
state = app.get_state(config)
for task in state.tasks:
print(f"Task: {task.name}")
print(f"Input: {task.input}")
Modifying State
Update state before resuming:
# Get current state
state = app.get_state(config)
# Modify the state
app.update_state(
config,
{"approved": True, "feedback": "Looks good!"},
as_node="human", # Update as if from 'human' node
)
# Resume with updated state
app.invoke(None, config)
Manual Interrupt
Trigger interrupts programmatically from nodes:
from langgraph.types import interrupt
def review_node(state: State):
"""Request human review if confidence is low."""
confidence = state["confidence"]
if confidence < 0.8:
# Interrupt for human review
feedback = interrupt(
{
"question": "Confidence is low. Should I proceed?",
"current_result": state["result"],
}
)
# Use feedback when resumed
if feedback.get("approved"):
return {"status": "approved"}
else:
return {"status": "rejected"}
return {"status": "auto_approved"}
Resume with input:
# Graph interrupts at review_node
result = app.invoke({...}, config)
# Provide feedback
app.update_state(
config,
{"approved": True},
)
# Resume - interrupt() returns the feedback
app.invoke(None, config)
Common Patterns
Review tool calls before execution:
from langgraph.prebuilt import ToolNode
# Create graph with tool approval
graph = StateGraph(AgentState)
graph.add_node("agent", call_model)
graph.add_node("tools", ToolNode(tools))
# Interrupt before running tools
app = graph.compile(
checkpointer=memory,
interrupt_before=["tools"],
)
# Agent decides to use a tool
app.invoke({"messages": [...]}, config)
# Review tool calls
state = app.get_state(config)
last_message = state.values["messages"][-1]
print(f"Tool calls: {last_message.tool_calls}")
# Approve or modify
if approve_tool_calls(last_message.tool_calls):
app.invoke(None, config) # Execute tools
else:
# Cancel tool execution
app.update_state(
config,
{"messages": [{"role": "assistant", "content": "Action cancelled"}]},
)
Multi-Step Approval
Require approval at multiple stages:
app = graph.compile(
checkpointer=memory,
interrupt_before=["research", "write", "publish"],
)
# Step through each stage
for step in ["research", "write", "publish"]:
state = app.get_state(config)
print(f"Paused before: {state.next}")
# Human reviews and approves
if get_human_approval(state):
app.invoke(None, config)
else:
break
Edit and Resume
Edit agent output before continuing:
# Interrupt after agent generates output
app = graph.compile(
checkpointer=memory,
interrupt_after=["agent"],
)
app.invoke({"messages": [...]}, config)
# Get agent output
state = app.get_state(config)
agent_message = state.values["messages"][-1]
# Edit the message
edited_content = edit_message(agent_message.content)
# Replace with edited version
app.update_state(
config,
{"messages": [{"role": "assistant", "content": edited_content}]},
)
# Continue with edited message
app.invoke(None, config)
Conditional Interrupts
Only interrupt when certain conditions are met:
def conditional_interrupt_node(state: State):
"""Interrupt only for sensitive operations."""
action = state["planned_action"]
# Check if action needs approval
if is_sensitive_action(action):
approval = interrupt({
"action": action,
"message": "This action requires approval",
})
if not approval.get("approved"):
return {"status": "cancelled"}
# Execute action
result = execute_action(action)
return {"result": result}
Dynamic Interrupts with Command
Use Command for advanced control:
from langgraph.types import Command, interrupt
def smart_node(state: State):
"""Dynamically decide whether to interrupt."""
result = process(state)
if needs_review(result):
# Interrupt and goto specific node after review
feedback = interrupt({"result": result})
return Command(
update={"result": feedback["revised_result"]},
goto="validation", # Skip to validation
)
return {"result": result}
Streaming with Interrupts
Handle interrupts during streaming:
config = {"configurable": {"thread_id": "thread-1"}}
for chunk in app.stream({"messages": [...]}, config, stream_mode="values"):
print(chunk)
# Check if interrupted
state = app.get_state(config)
if state.next:
print(f"Interrupted at: {state.next}")
# Resume streaming
for chunk in app.stream(None, config, stream_mode="values"):
print(chunk)
Timeout Handling
Implement timeouts for human input:
import time
def wait_for_approval(config, timeout_seconds=300):
"""Wait for human approval with timeout."""
start = time.time()
while time.time() - start < timeout_seconds:
state = app.get_state(config)
# Check if approved
if state.values.get("approved"):
return True
time.sleep(1)
# Timeout - auto-reject
app.update_state(config, {"approved": False, "reason": "timeout"})
return False
Best Practices
- Use checkpointers: Interrupts require a checkpointer to save state
- Clear communication: Provide context about why the interrupt occurred
- Validate input: Check user input before resuming
- Handle rejection: Have fallback logic when approval is denied
- Test interrupt paths: Verify behavior when interrupted and resumed
- Document interrupt points: Make it clear where interrupts can occur
- Implement timeouts: Don’t wait indefinitely for human input
- Log interrupts: Track when and why interrupts happen
Debugging Interrupts
Trace interrupt behavior:
# Enable debug mode
app = graph.compile(
checkpointer=memory,
interrupt_before=["tools"],
debug=True,
)
# Stream debug events
for event in app.stream({...}, config, stream_mode="debug"):
print(event)
Next Steps
- Learn about Deployment for production interrupt handling
- Explore Debugging to trace interrupt behavior
- Review Persistence for managing interrupted states