The Persistence feature (AgentCheckpoint) enables saving and restoring the complete state of an agent at specific points during execution. This supports resuming execution, rolling back to previous states, and recovering from failures.
What is Agent Persistence?
Agent Persistence captures:
Message history : All LLM interactions and tool calls
Current node : The node being executed
Input data : Current node’s input
Timestamp : When the checkpoint was created
This enables:
Resume execution : Continue from where you left off
Rollback : Go back to a previous state
Recovery : Restore after crashes or errors
Debugging : Replay execution from specific points
Installation
import ai.koog.agents.features.snapshot.AgentCheckpoint
import ai.koog.agents.features.snapshot.InMemoryAgentCheckpointStorageProvider
val agent = AIAgent (
executor = myExecutor,
strategy = myStrategy
) {
install (AgentCheckpoint) {
// Configure storage provider
snapshotProvider ( InMemoryAgentCheckpointStorageProvider ())
// Optional: auto-checkpoint after each node
continuouslyPersistent ()
}
}
The AgentCheckpoint feature requires that all nodes in your strategy have unique names .
Configuration Options
Storage Provider
Choose where checkpoints are stored:
install (AgentCheckpoint) {
// In-memory (lost on restart)
snapshotProvider ( InMemoryAgentCheckpointStorageProvider ())
// File-based (persists across restarts)
snapshotProvider ( FileAgentCheckpointStorageProvider (
directory = Path ( "checkpoints" )
))
// No storage (default)
snapshotProvider ( NoAgentCheckpointStorageProvider ())
}
Continuous Persistence
Automatically create checkpoints after each node:
install (AgentCheckpoint) {
snapshotProvider (myProvider)
// Enable auto-checkpointing
continuouslyPersistent ()
}
Continuous persistence creates a checkpoint after every node execution , which can be expensive. Use it during development or for critical workflows.
Storage Providers
InMemoryAgentCheckpointStorageProvider
Stores checkpoints in memory (lost on restart):
import ai.koog.agents.features.snapshot.InMemoryAgentCheckpointStorageProvider
val storage = InMemoryAgentCheckpointStorageProvider ()
install (AgentCheckpoint) {
snapshotProvider (storage)
}
Use cases:
Development and testing
Short-lived agents
When persistence across restarts isn’t needed
FileAgentCheckpointStorageProvider
Persists checkpoints to the file system:
import ai.koog.agents.features.snapshot.FileAgentCheckpointStorageProvider
import okio.Path.Companion.toPath
val storage = FileAgentCheckpointStorageProvider (
directory = "checkpoints" . toPath ()
)
install (AgentCheckpoint) {
snapshotProvider (storage)
}
Use cases:
Production environments
Long-running agents
Recovery from crashes
Debugging and replay
NoAgentCheckpointStorageProvider
Default no-op provider (doesn’t store anything):
import ai.koog.agents.features.snapshot.NoAgentCheckpointStorageProvider
val storage = NoAgentCheckpointStorageProvider ()
install (AgentCheckpoint) {
snapshotProvider (storage)
}
Custom Storage Provider
Implement your own storage backend:
import ai.koog.agents.features.snapshot.AgentCheckpointStorageProvider
import ai.koog.agents.features.snapshot.AgentCheckpointData
class DatabaseCheckpointStorage : AgentCheckpointStorageProvider {
override suspend fun getCheckpoints (agentId: String ): List < AgentCheckpointData > {
return database. query ( "SELECT * FROM checkpoints WHERE agent_id = ?" , agentId)
}
override suspend fun saveCheckpoint (
agentId: String ,
checkpoint: AgentCheckpointData
) {
database. insert ( "checkpoints" , checkpoint)
}
override suspend fun getLatestCheckpoint (agentId: String ): AgentCheckpointData ? {
return database. query (
"SELECT * FROM checkpoints WHERE agent_id = ? ORDER BY timestamp DESC LIMIT 1" ,
agentId
). firstOrNull ()
}
}
install (AgentCheckpoint) {
snapshotProvider ( DatabaseCheckpointStorage ())
}
Creating Checkpoints
Manual Checkpoint Creation
Create checkpoints explicitly in your nodes:
import ai.koog.agents.features.snapshot.checkpoint
import ai.koog.agents.features.snapshot.withCheckpoints
val saveCheckpoint by node < String , String > { input ->
// Create a checkpoint at this point
checkpoint (). createCheckpoint (
agentId = context.id,
agentContext = context,
nodeId = "saveCheckpoint" ,
lastInput = input
)
"Checkpoint created"
}
// Alternative: use withCheckpoints
val saveCheckpoint2 by node < String , String > { input ->
withCheckpoints (context) { ctx ->
createCheckpoint (
agentId = ctx.id,
agentContext = ctx,
nodeId = "saveCheckpoint2" ,
lastInput = input
)
}
"Checkpoint created"
}
Automatic Checkpoints
Use continuous persistence:
install (AgentCheckpoint) {
snapshotProvider ( FileAgentCheckpointStorageProvider ())
// Checkpoint after EVERY node execution
continuouslyPersistent ()
}
// No manual checkpoint code needed!
Restoring from Checkpoints
Rollback to Specific Checkpoint
Restore to a known checkpoint ID:
import ai.koog.agents.features.snapshot.checkpoint
val restoreCheckpoint by node < String , Unit > { checkpointId ->
// Roll back to specific checkpoint
checkpoint (). rollbackToCheckpoint (checkpointId, context)
}
Rollback to Latest Checkpoint
Restore to the most recent checkpoint:
val restoreLatest by node < Unit , Unit > {
// Roll back to latest checkpoint
checkpoint (). rollbackToLatestCheckpoint (context)
}
Set Execution Point Manually
Directly set the agent’s execution state:
val setExecutionPoint by node < Unit , Unit > {
checkpoint (). setExecutionPoint (
agentContext = context,
nodeId = "targetNode" ,
messageHistory = customHistory,
input = customInput
)
}
Checkpoint Data Structure
data class AgentCheckpointData (
val checkpointId: String , // Unique checkpoint identifier
val agentId: String , // Agent identifier
val nodeId: String , // Node being executed
val messageHistory: List < Message >, // Complete message history
val lastInput: Any ?, // Input data for current node
val timestamp: Long // Creation timestamp
)
Use Cases
Recovery from Failures
import ai.koog.agents.features.snapshot.AgentCheckpoint
import ai.koog.agents.features.snapshot.FileAgentCheckpointStorageProvider
val agent = AIAgent ( .. .) {
install (AgentCheckpoint) {
snapshotProvider ( FileAgentCheckpointStorageProvider ())
continuouslyPersistent ()
}
}
try {
agent. run ( "Process large dataset" )
} catch (e: Exception ) {
// On crash, can resume from last checkpoint
val checkpointFeature = agent.features[AgentCheckpoint]
val lastCheckpoint = checkpointFeature?.storage?. getLatestCheckpoint (agent.id)
if (lastCheckpoint != null ) {
println ( "Resuming from checkpoint ${ lastCheckpoint.checkpointId } " )
// Resume execution...
}
}
Interactive Debugging
val agent = AIAgent ( .. .) {
install (AgentCheckpoint) {
snapshotProvider ( InMemoryAgentCheckpointStorageProvider ())
continuouslyPersistent ()
}
}
// Run agent
agent. run ( "Debug this workflow" )
// List all checkpoints
val checkpoints = storage. getCheckpoints (agent.id)
checkpoints. forEach { checkpoint ->
println ( " ${ checkpoint.checkpointId } : ${ checkpoint.nodeId } at ${ checkpoint.timestamp } " )
}
// Replay from specific point
val targetCheckpoint = checkpoints[ 3 ]
agent. checkpoint (). rollbackToCheckpoint (targetCheckpoint.checkpointId, context)
Multi-Step Workflows with Rollback
val workflow = graphStrategy {
val step1 by node < String , String > { "Step 1 result" }
val step2 by node < String , String > { "Step 2 result" }
val step3 by node < String , String > { input ->
// Something goes wrong, rollback to step1
if (error) {
checkpoint (). rollbackToCheckpoint ( "step1-checkpoint" , context)
}
"Step 3 result"
}
edges {
start goesTo step1
step1 goesTo step2
step2 goesTo step3
step3 goesTo finish
}
}
val agent = AIAgent (
strategy = workflow,
executor = myExecutor
) {
install (AgentCheckpoint) {
snapshotProvider ( FileAgentCheckpointStorageProvider ())
continuouslyPersistent ()
}
}
A/B Testing with Checkpoints
val checkpoint1 = checkpoint (). createCheckpoint (
agentId = context.id,
agentContext = context,
nodeId = "decision-point" ,
lastInput = input
)
// Try approach A
val resultA = tryApproachA ()
// Rollback to checkpoint
checkpoint (). rollbackToCheckpoint (checkpoint1.checkpointId, context)
// Try approach B
val resultB = tryApproachB ()
// Compare results and choose best
if (resultA.score > resultB.score) {
checkpoint (). rollbackToCheckpoint (checkpoint1.checkpointId, context)
return resultA
} else {
return resultB
}
Complete Example
import ai.koog.agents.core.dsl.graphStrategy
import ai.koog.agents.features.snapshot.AgentCheckpoint
import ai.koog.agents.features.snapshot.FileAgentCheckpointStorageProvider
import ai.koog.agents.features.snapshot.checkpoint
val agent = AIAgent (
executor = openAIExecutor,
llmModel = OpenAIModels.Chat.GPT4o,
strategy = graphStrategy {
// Ensure unique names (required for checkpoints)
metadata {
uniqueNames = true
}
val analyzeRequirements by node < String , String > { requirements ->
requestLLM ( "Analyze requirements: $requirements " )
}
val designSystem by node < String , String > { analysis ->
// Create manual checkpoint before critical step
val cp = checkpoint (). createCheckpoint (
agentId = context.id,
agentContext = context,
nodeId = "designSystem" ,
lastInput = analysis
)
println ( "Checkpoint created: ${ cp.checkpointId } " )
requestLLM ( "Design system based on: $analysis " )
}
val implementSystem by node < String , String > { design ->
try {
requestLLM ( "Implement: $design " )
} catch (e: Exception ) {
// On error, rollback to latest checkpoint
println ( "Error occurred, rolling back..." )
checkpoint (). rollbackToLatestCheckpoint (context)
throw e
}
}
edges {
start goesTo analyzeRequirements
analyzeRequirements goesTo designSystem
designSystem goesTo implementSystem
implementSystem goesTo finish
}
}
) {
install (AgentCheckpoint) {
// Use file-based storage
snapshotProvider (
FileAgentCheckpointStorageProvider (
directory = Path ( "checkpoints" )
)
)
// Auto-checkpoint after each node
continuouslyPersistent ()
}
}
val result = agent. run ( "Build a web scraper" )
println (result)
Accessing Checkpoint Feature
import ai.koog.agents.features.snapshot.checkpoint
import ai.koog.agents.features.snapshot.withCheckpoints
// From context
val checkpointFeature = context. checkpoint ()
// With DSL
context. withCheckpoints { ctx ->
// 'this' is the checkpoint feature
val cp = createCheckpoint (ctx.id, ctx, "node" , input)
}
Best Practices
The checkpoint feature requires all nodes to have unique names. Set uniqueNames = true in strategy metadata.
Choose appropriate storage
Use file-based storage for production and in-memory for development/testing.
Be selective with continuous persistence
Continuous checkpointing has overhead. Use it for critical workflows or long-running agents.
Implement cleanup policies to remove old checkpoints and prevent storage bloat.
Regularly test that rollback and recovery work as expected for your use case.
Performance : Continuous persistence creates checkpoints after every node, which can impact performance for complex strategies. Use selectively.
Limitations
Requires unique node names in the strategy
Checkpoint data can be large for long conversations
File-based storage is not distributed (single machine only)
In-memory storage is lost on application restart
Memory Store structured facts across agent execution
Tracing Track execution history for debugging