singleRunStrategy() for multi-step processes.
Why Graph Workflows?
Graph-based strategies give you:- Explicit control flow — Define exact paths through your workflow
- Conditional branching — Route based on results, tool calls, or custom logic
- Subgraphs — Reuse and compose workflows
- Debugging — Visualize and test graph structure
Basic Graph Strategy
Creating a Strategy
Use thestrategy DSL to define your workflow:
import ai.koog.agents.core.dsl.builder.strategy
import ai.koog.agents.core.dsl.builder.forwardTo
import ai.koog.agents.core.dsl.extension.*
fun toneStrategy(name: String) = strategy<String, String>(name) {
// Define nodes
val nodeSendInput by nodeLLMRequest()
val nodeExecuteTool by nodeExecuteTool()
val nodeSendToolResult by nodeLLMSendToolResult()
val nodeCompressHistory by nodeLLMCompressHistory<ReceivedToolResult>()
// Define edges (workflow)
edge(nodeStart forwardTo nodeSendInput)
// If LLM responds with text, finish
edge(
nodeSendInput forwardTo nodeFinish
onAssistantMessage { true }
)
// If LLM calls a tool, execute it
edge(
nodeSendInput forwardTo nodeExecuteTool
onToolCall { true }
)
// Compress history if needed
edge(
nodeExecuteTool forwardTo nodeCompressHistory
onCondition { llm.readSession { prompt.messages.size > 100 } }
)
edge(nodeCompressHistory forwardTo nodeSendToolResult)
// Send tool result back to LLM
edge(
nodeExecuteTool forwardTo nodeSendToolResult
onCondition { llm.readSession { prompt.messages.size <= 100 } }
)
// Continue if LLM calls another tool
edge(
nodeSendToolResult forwardTo nodeExecuteTool
onToolCall { true }
)
// Finish if LLM responds with text
edge(
nodeSendToolResult forwardTo nodeFinish
onAssistantMessage { true }
)
}
Using the Strategy
val agent = AIAgent(
promptExecutor = executor,
strategy = toneStrategy("tone_analysis"),
agentConfig = AIAgentConfig(
prompt = prompt("tone-agent") {
system("You are a tone analysis agent...")
},
model = OpenAIModels.Chat.GPT4oMini
),
toolRegistry = toolRegistry
)
Built-in Nodes
Koog provides pre-built nodes for common operations:LLM Nodes
// Send request to LLM
val nodeLLMRequest by nodeLLMRequest()
// Send request and stream response
val nodeStreaming by nodeLLMRequestStreamingAndSendResults()
// Request structured output
val classifyRequest by nodeLLMRequestStructured<UserRequestClassification>()
// Send tool result back to LLM
val nodeSendToolResult by nodeLLMSendToolResult()
// Compress conversation history
val nodeCompressHistory by nodeLLMCompressHistory<ReceivedToolResult>()
Tool Nodes
// Execute a single tool
val nodeExecuteTool by nodeExecuteTool()
// Execute multiple tools in parallel
val executeMultipleTools by nodeExecuteMultipleTools(parallelTools = true)
Custom Nodes
Create custom processing nodes:val mapStringToRequests by node<String, List<Message.Request>> { input ->
listOf(Message.User(content = input, metaInfo = RequestMetaInfo.Empty))
}
val processData by node<InputType, OutputType> { input ->
// Your custom logic
val result = processInput(input)
result
}
Conditional Edges
Basic Conditions
// Condition on result
edge(
nodeExecuteTool forwardTo nodeCompressHistory
onCondition { llm.readSession { prompt.messages.size > 100 } }
)
// Condition on message type
edge(
nodeSendInput forwardTo nodeFinish
onAssistantMessage { true }
)
edge(
nodeSendInput forwardTo nodeExecuteTool
onToolCall { true }
)
Type-based Routing
// Route based on sealed interface type
edge(
classifyRequest forwardTo askMoreInfo
transformed { it.getOrThrow().data }
onIsInstance JokeRequestClassification.NeedsClarification::class
)
edge(
classifyRequest forwardTo generateJoke
transformed { it.getOrThrow().data }
onIsInstance JokeRequestClassification.Ready::class
)
Multiple Tool Calls
edge(
nodeStreaming forwardTo executeMultipleTools
onMultipleToolCalls { true }
)
Streaming with Graphs
Create agents that stream responses while executing tools:fun streamingWithToolsStrategy() = strategy("streaming_loop") {
val executeMultipleTools by nodeExecuteMultipleTools(parallelTools = true)
val nodeStreaming by nodeLLMRequestStreamingAndSendResults()
val mapStringToRequests by node<String, List<Message.Request>> { input ->
listOf(Message.User(content = input, metaInfo = RequestMetaInfo.Empty))
}
val applyRequestToSession by node<List<Message.Request>, List<Message.Request>> { input ->
llm.writeSession {
appendPrompt {
input.filterIsInstance<Message.User>()
.forEach { user(it.content) }
tool {
input.filterIsInstance<Message.Tool.Result>()
.forEach { result(it) }
}
}
input
}
}
val mapToolCallsToRequests by node<List<ReceivedToolResult>, List<Message.Request>> { input ->
input.map { it.toMessage() }
}
// Define flow
edge(nodeStart forwardTo mapStringToRequests)
edge(mapStringToRequests forwardTo applyRequestToSession)
edge(applyRequestToSession forwardTo nodeStreaming)
edge(nodeStreaming forwardTo executeMultipleTools onMultipleToolCalls { true })
edge(executeMultipleTools forwardTo mapToolCallsToRequests)
edge(mapToolCallsToRequests forwardTo applyRequestToSession)
edge(
nodeStreaming forwardTo nodeFinish onCondition {
it.filterIsInstance<Message.Tool.Call>().isEmpty()
}
)
}
Subgraphs
Reuse workflows as subgraphs within larger strategies:import ai.koog.agents.ext.agent.subgraphWithTask
import ai.koog.agents.ext.agent.subgraphWithVerification
fun wizardStrategy(
generateTools: List<Tool<*, *>>,
verifyTools: List<Tool<*, *>>,
fixTools: List<Tool<*, *>>
) = strategy<String, String>("wizard-with-checkstyle") {
// Subgraph: Generate project structure
val generate by subgraphWithTask<Unit, String>(
tools = generateTools,
llmModel = OpenAIModels.Chat.GPT4o,
) { input ->
"""
You are an AI agent that creates files and folders.
Create all necessary files and folders for the project.
""".trimIndent()
}
// Subgraph: Fix issues
val fix by subgraphWithTask<CriticResult<String>, String>(
tools = fixTools,
llmModel = AnthropicModels.Opus_4_6,
) { verificationResult ->
"""
Fix the following problems:
${verificationResult.feedback}
""".trimIndent()
}
// Subgraph: Verify project
val verify by subgraphWithVerification(verifyTools) { input: String ->
"""
Verify the created project by running build commands.
You can only read files and run shell commands.
""".trimIndent()
}
// Connect subgraphs
edge(nodeStart forwardTo generate transformed { })
edge(generate forwardTo verify transformed { "Project generated" })
edge(verify forwardTo fix onCondition { !it.successful })
edge(verify forwardTo nodeFinish onCondition { it.successful } transformed { "Project correct" })
edge(fix forwardTo verify)
}
Accessing Graph Context
Within nodes, access the agent’s state:val processInput by node<String, ProcessedData> { input ->
// Access original agent input
val originalInput = agentInput<A2AMessage>()
// Read/write LLM session
llm.writeSession {
appendPrompt {
user("Process: $input")
}
}
val history = llm.readSession { prompt.messages }
// Call custom logic
processData(input, history)
}
Edge Transformations
Transform data as it flows between nodes:// Transform output before passing to next node
edge(
setupTaskContext forwardTo classifyRequest
onCondition { task -> task == null }
transformed { agentInput<A2AMessage>().content() }
)
// Extract nested data
edge(
classifyRequest forwardTo generateJoke
transformed { it.getOrThrow().data }
onIsInstance JokeRequestClassification.Ready::class
)
Shorthand Syntax
Simplify sequential edges:// Instead of:
edge(nodeStart forwardTo nodeA)
edge(nodeA forwardTo nodeB)
// Use:
nodeStart then nodeA then nodeB
Testing Graph Structure
Validate your graph’s structure in tests:import ai.koog.agents.testing.feature.withTesting
val agent = AIAgent(
promptExecutor = mockExecutor,
strategy = myStrategy(),
agentConfig = config,
toolRegistry = toolRegistry
) {
withTesting()
testGraph("verify_structure") {
val firstSubgraph = assertSubgraphByName<String, String>("first")
val secondSubgraph = assertSubgraphByName<String, String>("second")
assertEdges {
startNode() alwaysGoesTo firstSubgraph
firstSubgraph alwaysGoesTo secondSubgraph
}
verifySubgraph(firstSubgraph) {
val askLLM = assertNodeByName<String, Message.Response>("callLLM")
assertNodes {
askLLM withInput "Hello" outputs Message.Assistant("Hello!")
}
}
}
}
Best Practices
Clear Node Names
Use descriptive names for debugging:val loadUserContext by node<String, UserContext> { ... }
val validatePermissions by node<UserContext, ValidationResult> { ... }
val executeAction by node<ValidationResult, ActionResult> { ... }
Error Handling
Add explicit error paths:val handleError by node<Throwable, String> { error ->
"Error occurred: ${error.message}"
}
edge(
riskyNode forwardTo handleError
onCondition { it is Failure }
)
Separate Concerns
Keep nodes focused on single responsibilities:// Good: Each node has one job
val fetchData by node<Request, Data> { ... }
val validateData by node<Data, ValidationResult> { ... }
val processData by node<Data, Result> { ... }
// Avoid: One node doing everything
val doEverything by node<Request, Result> { ... }
Next Steps
- Multi-Agent Systems — Coordinate multiple agents with graphs
- Streaming — Stream responses in graph workflows
- Testing Agents — Test graph structure and execution
Complete Example
A full graph-based agent with conditional routing:import ai.koog.agents.core.agent.AIAgent
import ai.koog.agents.core.dsl.builder.*
import ai.koog.agents.core.dsl.extension.*
val strategy = strategy<String, String>("tone_analyzer") {
val nodeSendInput by nodeLLMRequest()
val nodeExecuteTool by nodeExecuteTool()
val nodeSendToolResult by nodeLLMSendToolResult()
val nodeCompressHistory by nodeLLMCompressHistory<ReceivedToolResult>()
edge(nodeStart forwardTo nodeSendInput)
edge(
nodeSendInput forwardTo nodeFinish
onAssistantMessage { true }
)
edge(
nodeSendInput forwardTo nodeExecuteTool
onToolCall { true }
)
edge(
nodeExecuteTool forwardTo nodeCompressHistory
onCondition { llm.readSession { prompt.messages.size > 100 } }
)
edge(nodeCompressHistory forwardTo nodeSendToolResult)
edge(
nodeExecuteTool forwardTo nodeSendToolResult
onCondition { llm.readSession { prompt.messages.size <= 100 } }
)
edge(
nodeSendToolResult forwardTo nodeExecuteTool
onToolCall { true }
)
edge(
nodeSendToolResult forwardTo nodeFinish
onAssistantMessage { true }
)
}
val agent = AIAgent(
promptExecutor = executor,
strategy = strategy,
llmModel = OpenAIModels.Chat.GPT4oMini,
systemPrompt = "Analyze text tone using available tools.",
toolRegistry = toolRegistry
)