Why Streaming?
Streaming gives you:- Immediate feedback — Users see responses as they’re generated
- Better UX — No waiting for complete responses
- Parallel operations — Execute tools while streaming text
- Real-time updates — Monitor agent progress
Basic Streaming
Enable Streaming in Strategy
Use streaming nodes in your graph strategy:import ai.koog.agents.core.dsl.builder.strategy
import ai.koog.agents.core.dsl.extension.nodeLLMRequestStreamingAndSendResults
fun streamingStrategy() = strategy("streaming_agent") {
val nodeStreaming by nodeLLMRequestStreamingAndSendResults()
edge(nodeStart forwardTo nodeStreaming)
edge(nodeStreaming forwardTo nodeFinish)
}
Handle Streaming Events
Listen to streaming frames as they arrive:import ai.koog.agents.features.eventHandler.feature.handleEvents
import ai.koog.prompt.streaming.StreamFrame
val agent = AIAgent(
promptExecutor = executor,
llmModel = OpenAIModels.Chat.GPT4oMini,
strategy = streamingStrategy(),
toolRegistry = toolRegistry,
systemPrompt = "You are a helpful assistant."
) {
handleEvents {
onLLMStreamingFrameReceived { context ->
when (val frame = context.streamFrame) {
is StreamFrame.TextDelta -> print(frame.text)
is StreamFrame.ToolCallDelta -> {
// Handle partial tool call data
}
else -> {}
}
}
onLLMStreamingCompleted {
println() // New line after streaming completes
}
onLLMStreamingFailed { context ->
println("Error: ${context.error}")
}
}
}
Run the Streaming Agent
suspend fun main() {
println("Streaming agent started")
while (true) {
print("You: ")
val input = readln()
if (input == "/quit") break
print("Agent: ")
agent.run(input)
println()
}
executor.close()
}
Streaming with Tools
Combine streaming text with tool execution:Strategy with Parallel Tool Calls
import ai.koog.agents.core.dsl.extension.*
import ai.koog.agents.core.environment.ReceivedToolResult
fun streamingWithToolsStrategy() = strategy("streaming_loop") {
val executeMultipleTools by nodeExecuteMultipleTools(parallelTools = true)
val nodeStreaming by nodeLLMRequestStreamingAndSendResults()
val mapStringToRequests by node<String, List<Message.Request>> { input ->
listOf(Message.User(content = input, metaInfo = RequestMetaInfo.Empty))
}
val applyRequestToSession by node<List<Message.Request>, List<Message.Request>> { input ->
llm.writeSession {
appendPrompt {
input.filterIsInstance<Message.User>()
.forEach { user(it.content) }
tool {
input.filterIsInstance<Message.Tool.Result>()
.forEach { result(it) }
}
}
input
}
}
val mapToolCallsToRequests by node<List<ReceivedToolResult>, List<Message.Request>> { input ->
input.map { it.toMessage() }
}
// Define the streaming loop
edge(nodeStart forwardTo mapStringToRequests)
edge(mapStringToRequests forwardTo applyRequestToSession)
edge(applyRequestToSession forwardTo nodeStreaming)
// Execute tools when LLM requests them
edge(
nodeStreaming forwardTo executeMultipleTools
onMultipleToolCalls { true }
)
// Map tool results back to messages
edge(executeMultipleTools forwardTo mapToolCallsToRequests)
// Continue the loop with tool results
edge(mapToolCallsToRequests forwardTo applyRequestToSession)
// Finish when no tool calls are made
edge(
nodeStreaming forwardTo nodeFinish
onCondition {
it.filterIsInstance<Message.Tool.Call>().isEmpty()
}
)
}
Complete Example
StreamingAgentWithTools.kt
import ai.koog.agents.core.agent.AIAgent
import ai.koog.agents.core.tools.ToolRegistry
import ai.koog.agents.core.tools.reflect.asTools
import ai.koog.agents.features.eventHandler.feature.handleEvents
import ai.koog.prompt.executor.clients.openai.OpenAIModels
import ai.koog.prompt.executor.llms.all.simpleOpenAIExecutor
import ai.koog.prompt.streaming.StreamFrame
class Switch {
var isOn = false
fun turnOn() { isOn = true }
fun turnOff() { isOn = false }
}
class SwitchTools(private val switch: Switch) : ToolSet {
@Tool
@LLMDescription("Turn the switch on")
suspend fun turnOn(): String {
switch.turnOn()
return "Switch turned on"
}
@Tool
@LLMDescription("Turn the switch off")
suspend fun turnOff(): String {
switch.turnOff()
return "Switch turned off"
}
@Tool
@LLMDescription("Get current switch state")
suspend fun getState(): String {
return "Switch is ${if (switch.isOn) "on" else "off"}"
}
}
suspend fun main() {
val switch = Switch()
val toolRegistry = ToolRegistry {
tools(SwitchTools(switch).asTools())
}
simpleOpenAIExecutor(System.getenv("OPENAI_API_KEY")).use { executor ->
val agent = AIAgent(
promptExecutor = executor,
strategy = streamingWithToolsStrategy(),
llmModel = OpenAIModels.Chat.GPT4oMini,
systemPrompt = "You control a switch. Help users operate it.",
temperature = 0.0,
toolRegistry = toolRegistry
) {
handleEvents {
onToolCallStarting { context ->
println("\n🔧 Using ${context.toolName}...")
}
onLLMStreamingFrameReceived { context ->
(context.streamFrame as? StreamFrame.TextDelta)?.let { frame ->
print(frame.text)
}
}
onLLMStreamingFailed {
println("❌ Error: ${it.error}")
}
onLLMStreamingCompleted {
println()
}
}
}
println("Streaming chat agent started")
println("Use /quit to quit\n")
var input = ""
while (input != "/quit") {
print("You: ")
input = readln()
if (input == "/quit") break
print("Agent: ")
agent.run(input)
println()
}
}
}
Stream Events
Different event types you can handle:Text Deltas
onLLMStreamingFrameReceived { context ->
when (val frame = context.streamFrame) {
is StreamFrame.TextDelta -> {
// Partial text received
print(frame.text)
}
is StreamFrame.TextComplete -> {
// Full text message completed
println("Complete: ${frame.text}")
}
}
}
Tool Call Deltas
onLLMStreamingFrameReceived { context ->
when (val frame = context.streamFrame) {
is StreamFrame.ToolCallDelta -> {
// Partial tool call data
println("Tool: ${frame.toolName}, Args: ${frame.arguments}")
}
is StreamFrame.ToolCallComplete -> {
// Complete tool call ready
println("Tool call complete: ${frame.toolName}")
}
}
}
Stream Lifecycle
handleEvents {
onLLMStreamingStarted {
println("Streaming started...")
}
onLLMStreamingCompleted {
println("Streaming completed.")
}
onLLMStreamingFailed { context ->
println("Streaming failed: ${context.error}")
}
}
Parallel Tool Execution
Execute multiple tools simultaneously while streaming:val executeMultipleTools by nodeExecuteMultipleTools(
parallelTools = true // Execute tools in parallel
)
edge(
nodeStreaming forwardTo executeMultipleTools
onMultipleToolCalls { true }
)
Tool Execution Events
handleEvents {
onToolCallStarting { context ->
println("Calling ${context.toolName}...")
}
onToolCallCompleted { context ->
println("${context.toolName} completed: ${context.result}")
}
onToolCallFailed { context ->
println("${context.toolName} failed: ${context.error}")
}
}
Streaming Over HTTP
Serve streaming responses via Ktor:import io.ktor.server.application.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.http.*
import kotlinx.coroutines.flow.flow
fun Application.configureRouting() {
routing {
post("/chat") {
val request = call.receive<ChatRequest>()
call.respondTextWriter(ContentType.Text.EventStream) {
agent.run(request.message)
// Events are automatically sent via handleEvents
}
}
}
}
Structured Streaming
Stream structured data:import ai.koog.agents.core.dsl.extension.nodeLLMRequestStructured
@Serializable
data class StreamingResponse(
val status: String,
val progress: Int,
val result: String?
)
val structuredStreaming by nodeLLMRequestStructured<StreamingResponse>()
handleEvents {
onLLMStreamingFrameReceived { context ->
// Receive partial structured data
val partial = context.streamFrame as? StreamFrame.StructuredDelta
partial?.let {
println("Progress: ${it.data}")
}
}
}
Best Practices
Buffer Control
Flush output regularly for smooth streaming:onLLMStreamingFrameReceived { context ->
(context.streamFrame as? StreamFrame.TextDelta)?.let { frame ->
print(frame.text)
System.out.flush() // Ensure immediate display
}
}
Error Recovery
Handle streaming failures gracefully:onLLMStreamingFailed { context ->
println("\nStreaming interrupted: ${context.error.message}")
println("Retrying...")
// Optionally retry or fallback to non-streaming
}
Progress Indicators
Show activity during tool execution:onToolCallStarting { context ->
print("\n⏳ ${context.toolName}...")
}
onToolCallCompleted { context ->
print(" ✓")
}
Cancellation
Support stream cancellation:val job = scope.launch {
agent.run(input)
}
// Cancel if needed
job.cancel()
Testing Streaming Agents
Mock streaming behavior in tests:import ai.koog.agents.testing.tools.getMockExecutor
val mockExecutor = getMockExecutor(toolRegistry) {
mockLLMAnswer("Streaming response") onRequestContains "test"
mockTool(SwitchTools.TurnOnTool) alwaysReturns "Switch on"
}
val agent = AIAgent(
promptExecutor = mockExecutor,
strategy = streamingWithToolsStrategy(),
toolRegistry = toolRegistry
) {
handleEvents {
onLLMStreamingFrameReceived { context ->
receivedFrames.add(context.streamFrame)
}
}
}
runTest {
agent.run("Turn on the switch")
assertTrue(receivedFrames.isNotEmpty())
}
Next Steps
- Multi-Agent Systems — Stream responses in A2A communication
- Graph Workflows — Build complex streaming workflows
- Testing Agents — Test streaming behavior