Skip to main content
Graph workflows allow you to define complex agent behaviors as state machines with nodes and edges. This is more powerful than the simple singleRunStrategy() for multi-step processes.

Why Graph Workflows?

Graph-based strategies give you:
  • Explicit control flow — Define exact paths through your workflow
  • Conditional branching — Route based on results, tool calls, or custom logic
  • Subgraphs — Reuse and compose workflows
  • Debugging — Visualize and test graph structure

Basic Graph Strategy

Creating a Strategy

Use the strategy DSL to define your workflow:
import ai.koog.agents.core.dsl.builder.strategy
import ai.koog.agents.core.dsl.builder.forwardTo
import ai.koog.agents.core.dsl.extension.*

fun toneStrategy(name: String) = strategy<String, String>(name) {
    // Define nodes
    val nodeSendInput by nodeLLMRequest()
    val nodeExecuteTool by nodeExecuteTool()
    val nodeSendToolResult by nodeLLMSendToolResult()
    val nodeCompressHistory by nodeLLMCompressHistory<ReceivedToolResult>()
    
    // Define edges (workflow)
    edge(nodeStart forwardTo nodeSendInput)
    
    // If LLM responds with text, finish
    edge(
        nodeSendInput forwardTo nodeFinish
            onAssistantMessage { true }
    )
    
    // If LLM calls a tool, execute it
    edge(
        nodeSendInput forwardTo nodeExecuteTool
            onToolCall { true }
    )
    
    // Compress history if needed
    edge(
        nodeExecuteTool forwardTo nodeCompressHistory
            onCondition { llm.readSession { prompt.messages.size > 100 } }
    )
    
    edge(nodeCompressHistory forwardTo nodeSendToolResult)
    
    // Send tool result back to LLM
    edge(
        nodeExecuteTool forwardTo nodeSendToolResult
            onCondition { llm.readSession { prompt.messages.size <= 100 } }
    )
    
    // Continue if LLM calls another tool
    edge(
        nodeSendToolResult forwardTo nodeExecuteTool
            onToolCall { true }
    )
    
    // Finish if LLM responds with text
    edge(
        nodeSendToolResult forwardTo nodeFinish
            onAssistantMessage { true }
    )
}

Using the Strategy

val agent = AIAgent(
    promptExecutor = executor,
    strategy = toneStrategy("tone_analysis"),
    agentConfig = AIAgentConfig(
        prompt = prompt("tone-agent") {
            system("You are a tone analysis agent...")
        },
        model = OpenAIModels.Chat.GPT4oMini
    ),
    toolRegistry = toolRegistry
)

Built-in Nodes

Koog provides pre-built nodes for common operations:

LLM Nodes

// Send request to LLM
val nodeLLMRequest by nodeLLMRequest()

// Send request and stream response
val nodeStreaming by nodeLLMRequestStreamingAndSendResults()

// Request structured output
val classifyRequest by nodeLLMRequestStructured<UserRequestClassification>()

// Send tool result back to LLM
val nodeSendToolResult by nodeLLMSendToolResult()

// Compress conversation history
val nodeCompressHistory by nodeLLMCompressHistory<ReceivedToolResult>()

Tool Nodes

// Execute a single tool
val nodeExecuteTool by nodeExecuteTool()

// Execute multiple tools in parallel
val executeMultipleTools by nodeExecuteMultipleTools(parallelTools = true)

Custom Nodes

Create custom processing nodes:
val mapStringToRequests by node<String, List<Message.Request>> { input ->
    listOf(Message.User(content = input, metaInfo = RequestMetaInfo.Empty))
}

val processData by node<InputType, OutputType> { input ->
    // Your custom logic
    val result = processInput(input)
    result
}

Conditional Edges

Basic Conditions

// Condition on result
edge(
    nodeExecuteTool forwardTo nodeCompressHistory
        onCondition { llm.readSession { prompt.messages.size > 100 } }
)

// Condition on message type
edge(
    nodeSendInput forwardTo nodeFinish
        onAssistantMessage { true }
)

edge(
    nodeSendInput forwardTo nodeExecuteTool
        onToolCall { true }
)

Type-based Routing

// Route based on sealed interface type
edge(
    classifyRequest forwardTo askMoreInfo
        transformed { it.getOrThrow().data }
        onIsInstance JokeRequestClassification.NeedsClarification::class
)

edge(
    classifyRequest forwardTo generateJoke
        transformed { it.getOrThrow().data }
        onIsInstance JokeRequestClassification.Ready::class
)

Multiple Tool Calls

edge(
    nodeStreaming forwardTo executeMultipleTools
        onMultipleToolCalls { true }
)

Streaming with Graphs

Create agents that stream responses while executing tools:
fun streamingWithToolsStrategy() = strategy("streaming_loop") {
    val executeMultipleTools by nodeExecuteMultipleTools(parallelTools = true)
    val nodeStreaming by nodeLLMRequestStreamingAndSendResults()
    
    val mapStringToRequests by node<String, List<Message.Request>> { input ->
        listOf(Message.User(content = input, metaInfo = RequestMetaInfo.Empty))
    }
    
    val applyRequestToSession by node<List<Message.Request>, List<Message.Request>> { input ->
        llm.writeSession {
            appendPrompt {
                input.filterIsInstance<Message.User>()
                    .forEach { user(it.content) }
                
                tool {
                    input.filterIsInstance<Message.Tool.Result>()
                        .forEach { result(it) }
                }
            }
            input
        }
    }
    
    val mapToolCallsToRequests by node<List<ReceivedToolResult>, List<Message.Request>> { input ->
        input.map { it.toMessage() }
    }
    
    // Define flow
    edge(nodeStart forwardTo mapStringToRequests)
    edge(mapStringToRequests forwardTo applyRequestToSession)
    edge(applyRequestToSession forwardTo nodeStreaming)
    edge(nodeStreaming forwardTo executeMultipleTools onMultipleToolCalls { true })
    edge(executeMultipleTools forwardTo mapToolCallsToRequests)
    edge(mapToolCallsToRequests forwardTo applyRequestToSession)
    edge(
        nodeStreaming forwardTo nodeFinish onCondition {
            it.filterIsInstance<Message.Tool.Call>().isEmpty()
        }
    )
}

Subgraphs

Reuse workflows as subgraphs within larger strategies:
import ai.koog.agents.ext.agent.subgraphWithTask
import ai.koog.agents.ext.agent.subgraphWithVerification

fun wizardStrategy(
    generateTools: List<Tool<*, *>>,
    verifyTools: List<Tool<*, *>>,
    fixTools: List<Tool<*, *>>
) = strategy<String, String>("wizard-with-checkstyle") {
    
    // Subgraph: Generate project structure
    val generate by subgraphWithTask<Unit, String>(
        tools = generateTools,
        llmModel = OpenAIModels.Chat.GPT4o,
    ) { input ->
        """
            You are an AI agent that creates files and folders.
            Create all necessary files and folders for the project.
        """.trimIndent()
    }
    
    // Subgraph: Fix issues
    val fix by subgraphWithTask<CriticResult<String>, String>(
        tools = fixTools,
        llmModel = AnthropicModels.Opus_4_6,
    ) { verificationResult ->
        """
            Fix the following problems:
            ${verificationResult.feedback}
        """.trimIndent()
    }
    
    // Subgraph: Verify project
    val verify by subgraphWithVerification(verifyTools) { input: String ->
        """
            Verify the created project by running build commands.
            You can only read files and run shell commands.
        """.trimIndent()
    }
    
    // Connect subgraphs
    edge(nodeStart forwardTo generate transformed { })
    edge(generate forwardTo verify transformed { "Project generated" })
    edge(verify forwardTo fix onCondition { !it.successful })
    edge(verify forwardTo nodeFinish onCondition { it.successful } transformed { "Project correct" })
    edge(fix forwardTo verify)
}

Accessing Graph Context

Within nodes, access the agent’s state:
val processInput by node<String, ProcessedData> { input ->
    // Access original agent input
    val originalInput = agentInput<A2AMessage>()
    
    // Read/write LLM session
    llm.writeSession {
        appendPrompt {
            user("Process: $input")
        }
    }
    
    val history = llm.readSession { prompt.messages }
    
    // Call custom logic
    processData(input, history)
}

Edge Transformations

Transform data as it flows between nodes:
// Transform output before passing to next node
edge(
    setupTaskContext forwardTo classifyRequest
        onCondition { task -> task == null }
        transformed { agentInput<A2AMessage>().content() }
)

// Extract nested data
edge(
    classifyRequest forwardTo generateJoke
        transformed { it.getOrThrow().data }
        onIsInstance JokeRequestClassification.Ready::class
)

Shorthand Syntax

Simplify sequential edges:
// Instead of:
edge(nodeStart forwardTo nodeA)
edge(nodeA forwardTo nodeB)

// Use:
nodeStart then nodeA then nodeB

Testing Graph Structure

Validate your graph’s structure in tests:
import ai.koog.agents.testing.feature.withTesting

val agent = AIAgent(
    promptExecutor = mockExecutor,
    strategy = myStrategy(),
    agentConfig = config,
    toolRegistry = toolRegistry
) {
    withTesting()
    
    testGraph("verify_structure") {
        val firstSubgraph = assertSubgraphByName<String, String>("first")
        val secondSubgraph = assertSubgraphByName<String, String>("second")
        
        assertEdges {
            startNode() alwaysGoesTo firstSubgraph
            firstSubgraph alwaysGoesTo secondSubgraph
        }
        
        verifySubgraph(firstSubgraph) {
            val askLLM = assertNodeByName<String, Message.Response>("callLLM")
            assertNodes {
                askLLM withInput "Hello" outputs Message.Assistant("Hello!")
            }
        }
    }
}

Best Practices

Clear Node Names

Use descriptive names for debugging:
val loadUserContext by node<String, UserContext> { ... }
val validatePermissions by node<UserContext, ValidationResult> { ... }
val executeAction by node<ValidationResult, ActionResult> { ... }

Error Handling

Add explicit error paths:
val handleError by node<Throwable, String> { error ->
    "Error occurred: ${error.message}"
}

edge(
    riskyNode forwardTo handleError
        onCondition { it is Failure }
)

Separate Concerns

Keep nodes focused on single responsibilities:
// Good: Each node has one job
val fetchData by node<Request, Data> { ... }
val validateData by node<Data, ValidationResult> { ... }
val processData by node<Data, Result> { ... }

// Avoid: One node doing everything
val doEverything by node<Request, Result> { ... }

Next Steps

Complete Example

A full graph-based agent with conditional routing:
import ai.koog.agents.core.agent.AIAgent
import ai.koog.agents.core.dsl.builder.*
import ai.koog.agents.core.dsl.extension.*

val strategy = strategy<String, String>("tone_analyzer") {
    val nodeSendInput by nodeLLMRequest()
    val nodeExecuteTool by nodeExecuteTool()
    val nodeSendToolResult by nodeLLMSendToolResult()
    val nodeCompressHistory by nodeLLMCompressHistory<ReceivedToolResult>()
    
    edge(nodeStart forwardTo nodeSendInput)
    
    edge(
        nodeSendInput forwardTo nodeFinish
            onAssistantMessage { true }
    )
    
    edge(
        nodeSendInput forwardTo nodeExecuteTool
            onToolCall { true }
    )
    
    edge(
        nodeExecuteTool forwardTo nodeCompressHistory
            onCondition { llm.readSession { prompt.messages.size > 100 } }
    )
    
    edge(nodeCompressHistory forwardTo nodeSendToolResult)
    
    edge(
        nodeExecuteTool forwardTo nodeSendToolResult
            onCondition { llm.readSession { prompt.messages.size <= 100 } }
    )
    
    edge(
        nodeSendToolResult forwardTo nodeExecuteTool
            onToolCall { true }
    )
    
    edge(
        nodeSendToolResult forwardTo nodeFinish
            onAssistantMessage { true }
    )
}

val agent = AIAgent(
    promptExecutor = executor,
    strategy = strategy,
    llmModel = OpenAIModels.Chat.GPT4oMini,
    systemPrompt = "Analyze text tone using available tools.",
    toolRegistry = toolRegistry
)

Build docs developers (and LLMs) love