Skip to main content
Streaming allows agents to return partial responses as they’re generated, providing a better user experience for long-running operations.

Why Streaming?

Streaming gives you:
  • Immediate feedback — Users see responses as they’re generated
  • Better UX — No waiting for complete responses
  • Parallel operations — Execute tools while streaming text
  • Real-time updates — Monitor agent progress

Basic Streaming

Enable Streaming in Strategy

Use streaming nodes in your graph strategy:
import ai.koog.agents.core.dsl.builder.strategy
import ai.koog.agents.core.dsl.extension.nodeLLMRequestStreamingAndSendResults

fun streamingStrategy() = strategy("streaming_agent") {
    val nodeStreaming by nodeLLMRequestStreamingAndSendResults()
    
    edge(nodeStart forwardTo nodeStreaming)
    edge(nodeStreaming forwardTo nodeFinish)
}

Handle Streaming Events

Listen to streaming frames as they arrive:
import ai.koog.agents.features.eventHandler.feature.handleEvents
import ai.koog.prompt.streaming.StreamFrame

val agent = AIAgent(
    promptExecutor = executor,
    llmModel = OpenAIModels.Chat.GPT4oMini,
    strategy = streamingStrategy(),
    toolRegistry = toolRegistry,
    systemPrompt = "You are a helpful assistant."
) {
    handleEvents {
        onLLMStreamingFrameReceived { context ->
            when (val frame = context.streamFrame) {
                is StreamFrame.TextDelta -> print(frame.text)
                is StreamFrame.ToolCallDelta -> {
                    // Handle partial tool call data
                }
                else -> {}
            }
        }
        
        onLLMStreamingCompleted {
            println() // New line after streaming completes
        }
        
        onLLMStreamingFailed { context ->
            println("Error: ${context.error}")
        }
    }
}

Run the Streaming Agent

suspend fun main() {
    println("Streaming agent started")
    
    while (true) {
        print("You: ")
        val input = readln()
        if (input == "/quit") break
        
        print("Agent: ")
        agent.run(input)
        println()
    }
    
    executor.close()
}

Streaming with Tools

Combine streaming text with tool execution:

Strategy with Parallel Tool Calls

import ai.koog.agents.core.dsl.extension.*
import ai.koog.agents.core.environment.ReceivedToolResult

fun streamingWithToolsStrategy() = strategy("streaming_loop") {
    val executeMultipleTools by nodeExecuteMultipleTools(parallelTools = true)
    val nodeStreaming by nodeLLMRequestStreamingAndSendResults()
    
    val mapStringToRequests by node<String, List<Message.Request>> { input ->
        listOf(Message.User(content = input, metaInfo = RequestMetaInfo.Empty))
    }
    
    val applyRequestToSession by node<List<Message.Request>, List<Message.Request>> { input ->
        llm.writeSession {
            appendPrompt {
                input.filterIsInstance<Message.User>()
                    .forEach { user(it.content) }
                
                tool {
                    input.filterIsInstance<Message.Tool.Result>()
                        .forEach { result(it) }
                }
            }
            input
        }
    }
    
    val mapToolCallsToRequests by node<List<ReceivedToolResult>, List<Message.Request>> { input ->
        input.map { it.toMessage() }
    }
    
    // Define the streaming loop
    edge(nodeStart forwardTo mapStringToRequests)
    edge(mapStringToRequests forwardTo applyRequestToSession)
    edge(applyRequestToSession forwardTo nodeStreaming)
    
    // Execute tools when LLM requests them
    edge(
        nodeStreaming forwardTo executeMultipleTools
            onMultipleToolCalls { true }
    )
    
    // Map tool results back to messages
    edge(executeMultipleTools forwardTo mapToolCallsToRequests)
    
    // Continue the loop with tool results
    edge(mapToolCallsToRequests forwardTo applyRequestToSession)
    
    // Finish when no tool calls are made
    edge(
        nodeStreaming forwardTo nodeFinish
            onCondition {
                it.filterIsInstance<Message.Tool.Call>().isEmpty()
            }
    )
}

Complete Example

StreamingAgentWithTools.kt
import ai.koog.agents.core.agent.AIAgent
import ai.koog.agents.core.tools.ToolRegistry
import ai.koog.agents.core.tools.reflect.asTools
import ai.koog.agents.features.eventHandler.feature.handleEvents
import ai.koog.prompt.executor.clients.openai.OpenAIModels
import ai.koog.prompt.executor.llms.all.simpleOpenAIExecutor
import ai.koog.prompt.streaming.StreamFrame

class Switch {
    var isOn = false
    fun turnOn() { isOn = true }
    fun turnOff() { isOn = false }
}

class SwitchTools(private val switch: Switch) : ToolSet {
    @Tool
    @LLMDescription("Turn the switch on")
    suspend fun turnOn(): String {
        switch.turnOn()
        return "Switch turned on"
    }
    
    @Tool
    @LLMDescription("Turn the switch off")
    suspend fun turnOff(): String {
        switch.turnOff()
        return "Switch turned off"
    }
    
    @Tool
    @LLMDescription("Get current switch state")
    suspend fun getState(): String {
        return "Switch is ${if (switch.isOn) "on" else "off"}"
    }
}

suspend fun main() {
    val switch = Switch()
    val toolRegistry = ToolRegistry {
        tools(SwitchTools(switch).asTools())
    }
    
    simpleOpenAIExecutor(System.getenv("OPENAI_API_KEY")).use { executor ->
        val agent = AIAgent(
            promptExecutor = executor,
            strategy = streamingWithToolsStrategy(),
            llmModel = OpenAIModels.Chat.GPT4oMini,
            systemPrompt = "You control a switch. Help users operate it.",
            temperature = 0.0,
            toolRegistry = toolRegistry
        ) {
            handleEvents {
                onToolCallStarting { context ->
                    println("\n🔧 Using ${context.toolName}...")
                }
                
                onLLMStreamingFrameReceived { context ->
                    (context.streamFrame as? StreamFrame.TextDelta)?.let { frame ->
                        print(frame.text)
                    }
                }
                
                onLLMStreamingFailed {
                    println("❌ Error: ${it.error}")
                }
                
                onLLMStreamingCompleted {
                    println()
                }
            }
        }
        
        println("Streaming chat agent started")
        println("Use /quit to quit\n")
        
        var input = ""
        while (input != "/quit") {
            print("You: ")
            input = readln()
            if (input == "/quit") break
            
            print("Agent: ")
            agent.run(input)
            println()
        }
    }
}

Stream Events

Different event types you can handle:

Text Deltas

onLLMStreamingFrameReceived { context ->
    when (val frame = context.streamFrame) {
        is StreamFrame.TextDelta -> {
            // Partial text received
            print(frame.text)
        }
        is StreamFrame.TextComplete -> {
            // Full text message completed
            println("Complete: ${frame.text}")
        }
    }
}

Tool Call Deltas

onLLMStreamingFrameReceived { context ->
    when (val frame = context.streamFrame) {
        is StreamFrame.ToolCallDelta -> {
            // Partial tool call data
            println("Tool: ${frame.toolName}, Args: ${frame.arguments}")
        }
        is StreamFrame.ToolCallComplete -> {
            // Complete tool call ready
            println("Tool call complete: ${frame.toolName}")
        }
    }
}

Stream Lifecycle

handleEvents {
    onLLMStreamingStarted {
        println("Streaming started...")
    }
    
    onLLMStreamingCompleted {
        println("Streaming completed.")
    }
    
    onLLMStreamingFailed { context ->
        println("Streaming failed: ${context.error}")
    }
}

Parallel Tool Execution

Execute multiple tools simultaneously while streaming:
val executeMultipleTools by nodeExecuteMultipleTools(
    parallelTools = true  // Execute tools in parallel
)

edge(
    nodeStreaming forwardTo executeMultipleTools
        onMultipleToolCalls { true }
)

Tool Execution Events

handleEvents {
    onToolCallStarting { context ->
        println("Calling ${context.toolName}...")
    }
    
    onToolCallCompleted { context ->
        println("${context.toolName} completed: ${context.result}")
    }
    
    onToolCallFailed { context ->
        println("${context.toolName} failed: ${context.error}")
    }
}

Streaming Over HTTP

Serve streaming responses via Ktor:
import io.ktor.server.application.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.http.*
import kotlinx.coroutines.flow.flow

fun Application.configureRouting() {
    routing {
        post("/chat") {
            val request = call.receive<ChatRequest>()
            
            call.respondTextWriter(ContentType.Text.EventStream) {
                agent.run(request.message)
                
                // Events are automatically sent via handleEvents
            }
        }
    }
}

Structured Streaming

Stream structured data:
import ai.koog.agents.core.dsl.extension.nodeLLMRequestStructured

@Serializable
data class StreamingResponse(
    val status: String,
    val progress: Int,
    val result: String?
)

val structuredStreaming by nodeLLMRequestStructured<StreamingResponse>()

handleEvents {
    onLLMStreamingFrameReceived { context ->
        // Receive partial structured data
        val partial = context.streamFrame as? StreamFrame.StructuredDelta
        partial?.let {
            println("Progress: ${it.data}")
        }
    }
}

Best Practices

Buffer Control

Flush output regularly for smooth streaming:
onLLMStreamingFrameReceived { context ->
    (context.streamFrame as? StreamFrame.TextDelta)?.let { frame ->
        print(frame.text)
        System.out.flush() // Ensure immediate display
    }
}

Error Recovery

Handle streaming failures gracefully:
onLLMStreamingFailed { context ->
    println("\nStreaming interrupted: ${context.error.message}")
    println("Retrying...")
    
    // Optionally retry or fallback to non-streaming
}

Progress Indicators

Show activity during tool execution:
onToolCallStarting { context ->
    print("\n${context.toolName}...")
}

onToolCallCompleted { context ->
    print(" ✓")
}

Cancellation

Support stream cancellation:
val job = scope.launch {
    agent.run(input)
}

// Cancel if needed
job.cancel()

Testing Streaming Agents

Mock streaming behavior in tests:
import ai.koog.agents.testing.tools.getMockExecutor

val mockExecutor = getMockExecutor(toolRegistry) {
    mockLLMAnswer("Streaming response") onRequestContains "test"
    
    mockTool(SwitchTools.TurnOnTool) alwaysReturns "Switch on"
}

val agent = AIAgent(
    promptExecutor = mockExecutor,
    strategy = streamingWithToolsStrategy(),
    toolRegistry = toolRegistry
) {
    handleEvents {
        onLLMStreamingFrameReceived { context ->
            receivedFrames.add(context.streamFrame)
        }
    }
}

runTest {
    agent.run("Turn on the switch")
    assertTrue(receivedFrames.isNotEmpty())
}

Next Steps

Complete Streaming Example

See the StreamingAgentWithTools.kt example for a full implementation.

Build docs developers (and LLMs) love