Skip to main content

Overview

Optimizing AI agent performance involves balancing multiple factors: response latency, token usage, memory consumption, and cost. This guide covers techniques for building high-performance agents that scale efficiently.

Performance Bottlenecks

Common Performance Issues

  1. LLM Latency (typically 1-10 seconds)
    • Network round-trip time
    • Model inference time
    • Token generation speed
  2. Token Usage (impacts cost and speed)
    • Large prompts increase latency
    • Context window limitations
    • Redundant information
  3. Tool Execution (varies by tool)
    • Database queries
    • API calls
    • File I/O operations
  4. Memory Usage
    • Large conversation histories
    • Caching overhead
    • Accumulated state

Response Caching

Prompt-Level Caching

Cache LLM responses to avoid redundant calls:
import ai.koog.prompt.executor.cached.CachedPromptExecutor
import ai.koog.prompt.cache.memory.InMemoryPromptCache

// In-memory cache (all platforms)
val cache = InMemoryPromptCache()
val cachedExecutor = CachedPromptExecutor(
    cache = cache,
    nested = originalExecutor
)

val agent = AIAgent("cached-agent") {
    withPromptExecutor(cachedExecutor)
}

File-Based Cache

Persist cached responses across restarts (JVM/Android/iOS):
import ai.koog.prompt.cache.files.FilePromptCache

val cache = FilePromptCache(
    cacheDir = File("./cache/prompts")
)

val cachedExecutor = CachedPromptExecutor(
    cache = cache,
    nested = originalExecutor
)

Redis Cache

Shared cache across multiple instances (JVM only):
import ai.koog.prompt.cache.redis.RedisPromptCache
import redis.clients.jedis.JedisPool

val jedisPool = JedisPool("localhost", 6379)
val cache = RedisPromptCache(
    jedisPool = jedisPool,
    keyPrefix = "koog:prompts:",
    ttl = 3600 // 1 hour
)

val cachedExecutor = CachedPromptExecutor(
    cache = cache,
    nested = originalExecutor
)

Cache Configuration

class PromptCacheConfig(
    // Cache key includes prompt + tools
    val includeTools: Boolean = true,
    
    // Time-to-live for cache entries
    val ttl: Duration = 1.hours,
    
    // Maximum cache size
    val maxEntries: Int = 10000,
    
    // Eviction policy
    val evictionPolicy: EvictionPolicy = EvictionPolicy.LRU
)

Streaming with Cache

Cached responses work with streaming:
val cachedExecutor = CachedPromptExecutor(cache, executor)

cachedExecutor.executeStreaming(prompt, model, tools)
    .collect { frame ->
        // Cached responses are converted to stream frames
        when (frame) {
            is StreamFrame.Delta -> print(frame.content)
            is StreamFrame.Complete -> println()
        }
    }

Token Optimization

Minimize Prompt Size

// ❌ Bad: Verbose, repetitive prompt
val verbosePrompt = """
    You are a helpful assistant that helps users with their questions.
    You should always be polite and respectful.
    You should provide accurate information.
    You should cite your sources when possible.
    ...
""".trimIndent()

// ✅ Good: Concise, specific prompt
val concisePrompt = """
    Answer questions accurately and cite sources.
""".trimIndent()

Use Structured Outputs

Structured outputs reduce token usage:
// Request specific format
val result = llm.complete<SearchResult>(
    prompt = "Search for Kotlin documentation",
    schema = SearchResult.schema
)

// More efficient than parsing natural language

Truncate Long Contexts

Limit conversation history:
class SlidingWindowMemory(
    private val windowSize: Int = 10
) : ChatHistoryProvider {
    override suspend fun load(sessionId: String): List<Message> {
        val fullHistory = storage.loadHistory(sessionId)
        // Keep only last N messages
        return fullHistory.takeLast(windowSize)
    }
}

install(ChatMemory) {
    chatHistoryProvider = SlidingWindowMemory(windowSize = 10)
}

Summarize Old Messages

class SummarizingMemory : ChatHistoryProvider {
    override suspend fun load(sessionId: String): List<Message> {
        val fullHistory = storage.loadHistory(sessionId)
        
        if (fullHistory.size > 20) {
            // Summarize old messages
            val oldMessages = fullHistory.take(fullHistory.size - 10)
            val summary = summarize(oldMessages)
            val recentMessages = fullHistory.takeLast(10)
            
            return listOf(Message.System(summary)) + recentMessages
        }
        
        return fullHistory
    }
    
    private suspend fun summarize(messages: List<Message>): String {
        return llm.complete(
            "Summarize this conversation in 2-3 sentences:\n" +
            messages.joinToString("\n") { "${it.role}: ${it.content}" }
        )
    }
}

Token Counting

Monitor token usage:
import ai.koog.agents.features.tokenizer.Tokenizer

val agent = AIAgent("monitored") {
    install(Tokenizer) {
        // Track token usage
        onTokenUsage { usage ->
            logger.info {
                "Tokens - Prompt: ${usage.promptTokens}, " +
                "Completion: ${usage.completionTokens}, " +
                "Total: ${usage.totalTokens}"
            }
        }
    }
}

Parallel Execution

Parallel Nodes

Execute multiple operations concurrently:
val searchNode by node<Query, SearchResult>("search") { query ->
    searchService.search(query)
}

val summarizeNode by node<Query, Summary>("summarize") { query ->
    llm.complete("Summarize: $query")
}

val classifyNode by node<Query, Classification>("classify") { query ->
    classifier.classify(query)
}

// Execute all nodes in parallel
val parallelResults by parallel<Query, Result>("parallel-processing") {
    node(searchNode)
    node(summarizeNode)
    node(classifyNode)
    
    // Merge results
    merge { context ->
        val search = context.results[0].nodeResult.output as SearchResult
        val summary = context.results[1].nodeResult.output as Summary
        val classification = context.results[2].nodeResult.output as Classification
        
        Result(search, summary, classification)
    }
}

nodeStart then parallelResults then nodeFinish

Parallel Tool Execution

val agent = AIAgent("parallel-tools") {
    // Enable parallel tool execution
    agentConfig {
        enableParallelToolExecution = true
    }
    
    tool(SearchTool)
    tool(WeatherTool)
    tool(NewsTool)
}

// LLM can request multiple tools simultaneously
// All tools execute in parallel

Performance Impact

// Sequential: 3 operations × 2 seconds = 6 seconds
val result1 = operation1()
val result2 = operation2()
val result3 = operation3()

// Parallel: max(2, 2, 2) = 2 seconds
val (result1, result2, result3) = coroutineScope {
    awaitAll(
        async { operation1() },
        async { operation2() },
        async { operation3() }
    )
}

Model Selection

Choose Appropriate Models

// Fast, cheap model for simple tasks
val simpleAgent = AIAgent("simple") {
    withModel(GPT3_5_TURBO)  // ~500ms, $0.0005/1K tokens
}

// Powerful model for complex reasoning
val complexAgent = AIAgent("complex") {
    withModel(GPT4)  // ~2s, $0.03/1K tokens
}

// Ultra-fast model for real-time responses
val realtimeAgent = AIAgent("realtime") {
    withModel(CLAUDE_3_HAIKU)  // ~300ms, $0.00025/1K tokens
}

Dynamic Model Selection

val adaptiveNode by node<Task, Result>("adaptive") { task ->
    val model = when {
        task.complexity > 8 -> GPT4
        task.complexity > 5 -> GPT3_5_TURBO
        else -> CLAUDE_3_HAIKU
    }
    
    llm.complete(task.description, model = model)
}

Model Routing

class ModelRouter {
    suspend fun route(prompt: String): LLModel {
        return when {
            // Use fast model for simple queries
            prompt.length < 100 -> CLAUDE_3_HAIKU
            
            // Use balanced model for medium queries
            prompt.length < 500 -> GPT3_5_TURBO
            
            // Use powerful model for complex queries
            else -> GPT4
        }
    }
}

Connection Pooling

HTTP Client Configuration

import io.ktor.client.*
import io.ktor.client.engine.cio.*

val httpClient = HttpClient(CIO) {
    engine {
        // Connection pooling
        maxConnectionsCount = 100
        endpoint {
            maxConnectionsPerRoute = 20
            keepAliveTime = 30_000
        }
        
        // Pipelining
        pipelineMaxSize = 20
    }
}

Reuse LLM Clients

// ✅ Good: Single client instance
val openAIClient = OpenAILLMClient(apiKey)

val agent1 = AIAgent("agent1") {
    withPromptExecutor(PromptExecutor(openAIClient))
}

val agent2 = AIAgent("agent2") {
    withPromptExecutor(PromptExecutor(openAIClient))
}

// ❌ Bad: New client for each agent
val agent3 = AIAgent("agent3") {
    withPromptExecutor(PromptExecutor(
        OpenAILLMClient(apiKey) // Creates new connection pool
    ))
}

Memory Management

Limit State Size

val agent = AIAgent("memory-efficient") {
    // Clear old state regularly
    interceptStrategyCompleted(this) { context ->
        // Remove temporary keys
        context.storage.removeAll { key ->
            key.name.startsWith("temp_")
        }
    }
}

Use Weak References for Cache

import java.lang.ref.WeakReference

class WeakCache<K, V> {
    private val cache = mutableMapOf<K, WeakReference<V>>()
    
    fun get(key: K): V? {
        return cache[key]?.get()
    }
    
    fun put(key: K, value: V) {
        cache[key] = WeakReference(value)
    }
}

Stream Large Responses

// Instead of loading entire response into memory
val response = llm.complete(longPrompt)

// Stream and process incrementally
llm.completeStreaming(longPrompt)
    .collect { delta ->
        process(delta)
        // Delta is garbage collected after processing
    }

Benchmarking

Measure Agent Performance

import kotlin.system.measureTimeMillis

class AgentBenchmark {
    suspend fun benchmark(agent: AIAgent, iterations: Int = 100) {
        val latencies = mutableListOf<Long>()
        
        repeat(iterations) {
            val latency = measureTimeMillis {
                agent.execute("test input")
            }
            latencies.add(latency)
        }
        
        println("Results:")
        println("  Mean: ${latencies.average().roundToInt()}ms")
        println("  P50: ${latencies.percentile(50)}ms")
        println("  P95: ${latencies.percentile(95)}ms")
        println("  P99: ${latencies.percentile(99)}ms")
    }
    
    private fun List<Long>.percentile(p: Int): Long {
        val sorted = sorted()
        val index = (sorted.size * p / 100).coerceIn(0, sorted.size - 1)
        return sorted[index]
    }
}

Load Testing

import kotlinx.coroutines.*

suspend fun loadTest(agent: AIAgent, concurrency: Int = 10) {
    val requests = 1000
    val successCount = AtomicInteger(0)
    val errorCount = AtomicInteger(0)
    
    val startTime = Clock.System.now()
    
    coroutineScope {
        // Launch concurrent requests
        val jobs = List(concurrency) {
            async {
                repeat(requests / concurrency) {
                    try {
                        agent.execute("test")
                        successCount.incrementAndGet()
                    } catch (e: Exception) {
                        errorCount.incrementAndGet()
                    }
                }
            }
        }
        jobs.awaitAll()
    }
    
    val duration = (Clock.System.now() - startTime).inWholeSeconds
    val rps = requests / duration
    
    println("Load Test Results:")
    println("  Requests: $requests")
    println("  Concurrency: $concurrency")
    println("  Duration: ${duration}s")
    println("  RPS: $rps")
    println("  Success: ${successCount.get()}")
    println("  Errors: ${errorCount.get()}")
}

Best Practices

1. Cache Aggressively

// Cache at multiple levels
- LLM response cache (CachedPromptExecutor)
- Tool result cache (per-tool caching)
- Intermediate computation cache (storage)

2. Minimize LLM Calls

// ❌ Bad: Multiple calls for simple task
val sentiment = llm.complete("What's the sentiment?")
val summary = llm.complete("Summarize this")
val keywords = llm.complete("Extract keywords")

// ✅ Good: Single call with structured output
val result = llm.complete<Analysis>(
    "Analyze this text: sentiment, summary, keywords"
)

3. Use Streaming for Long Responses

// Better UX and lower memory usage
llm.completeStreaming(prompt).collect { delta ->
    ui.appendText(delta.content)
}

4. Profile in Production

install(OpenTelemetry) {
    // Track performance metrics
    setVerbose(false)
    setSampler(Sampler.traceIdRatioBased(0.1))
}

5. Set Appropriate Timeouts

val httpClient = HttpClient(CIO) {
    install(HttpTimeout) {
        requestTimeoutMillis = 30_000  // 30s for normal requests
        connectTimeoutMillis = 5_000   // 5s for connection
    }
}

Performance Targets

Typical Latencies

OperationTargetNotes
Simple LLM call< 1sWith fast model (Haiku, GPT-3.5)
Complex LLM call< 5sWith powerful model (GPT-4, Claude)
Tool execution< 500msFor local/cached operations
Agent execution< 10sEnd-to-end for typical workflow
Streaming first token< 500msTime to first response

Resource Usage

MetricTargetNotes
Memory per agent< 50MBWithout large caches
Cache memory< 500MBWith aggressive caching
Concurrent agents100+Per server instance
RPS10-50Requests per second per instance

Monitoring Performance

install(Metrics) {
    trackLatency = true
    trackTokenUsage = true
    trackCacheHits = true
    
    collector { metrics ->
        // Alert on slow requests
        if (metrics.p95Latency > 5.seconds) {
            alerting.warn("High P95 latency: ${metrics.p95Latency}")
        }
        
        // Alert on low cache hit rate
        if (metrics.cacheHitRate < 0.5) {
            alerting.warn("Low cache hit rate: ${metrics.cacheHitRate}")
        }
    }
}

Resources

Build docs developers (and LLMs) love