Documentation Index
Fetch the complete documentation index at: https://mintlify.com/JetBrains/koog/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Optimizing AI agent performance involves balancing multiple factors: response latency, token usage, memory consumption, and cost. This guide covers techniques for building high-performance agents that scale efficiently.
-
LLM Latency (typically 1-10 seconds)
- Network round-trip time
- Model inference time
- Token generation speed
-
Token Usage (impacts cost and speed)
- Large prompts increase latency
- Context window limitations
- Redundant information
-
Tool Execution (varies by tool)
- Database queries
- API calls
- File I/O operations
-
Memory Usage
- Large conversation histories
- Caching overhead
- Accumulated state
Response Caching
Prompt-Level Caching
Cache LLM responses to avoid redundant calls:
import ai.koog.prompt.executor.cached.CachedPromptExecutor
import ai.koog.prompt.cache.memory.InMemoryPromptCache
// In-memory cache (all platforms)
val cache = InMemoryPromptCache()
val cachedExecutor = CachedPromptExecutor(
cache = cache,
nested = originalExecutor
)
val agent = AIAgent("cached-agent") {
withPromptExecutor(cachedExecutor)
}
File-Based Cache
Persist cached responses across restarts (JVM/Android/iOS):
import ai.koog.prompt.cache.files.FilePromptCache
val cache = FilePromptCache(
cacheDir = File("./cache/prompts")
)
val cachedExecutor = CachedPromptExecutor(
cache = cache,
nested = originalExecutor
)
Redis Cache
Shared cache across multiple instances (JVM only):
import ai.koog.prompt.cache.redis.RedisPromptCache
import redis.clients.jedis.JedisPool
val jedisPool = JedisPool("localhost", 6379)
val cache = RedisPromptCache(
jedisPool = jedisPool,
keyPrefix = "koog:prompts:",
ttl = 3600 // 1 hour
)
val cachedExecutor = CachedPromptExecutor(
cache = cache,
nested = originalExecutor
)
Cache Configuration
class PromptCacheConfig(
// Cache key includes prompt + tools
val includeTools: Boolean = true,
// Time-to-live for cache entries
val ttl: Duration = 1.hours,
// Maximum cache size
val maxEntries: Int = 10000,
// Eviction policy
val evictionPolicy: EvictionPolicy = EvictionPolicy.LRU
)
Streaming with Cache
Cached responses work with streaming:
val cachedExecutor = CachedPromptExecutor(cache, executor)
cachedExecutor.executeStreaming(prompt, model, tools)
.collect { frame ->
// Cached responses are converted to stream frames
when (frame) {
is StreamFrame.Delta -> print(frame.content)
is StreamFrame.Complete -> println()
}
}
Token Optimization
Minimize Prompt Size
// ❌ Bad: Verbose, repetitive prompt
val verbosePrompt = """
You are a helpful assistant that helps users with their questions.
You should always be polite and respectful.
You should provide accurate information.
You should cite your sources when possible.
...
""".trimIndent()
// ✅ Good: Concise, specific prompt
val concisePrompt = """
Answer questions accurately and cite sources.
""".trimIndent()
Use Structured Outputs
Structured outputs reduce token usage:
// Request specific format
val result = llm.complete<SearchResult>(
prompt = "Search for Kotlin documentation",
schema = SearchResult.schema
)
// More efficient than parsing natural language
Truncate Long Contexts
Limit conversation history:
class SlidingWindowMemory(
private val windowSize: Int = 10
) : ChatHistoryProvider {
override suspend fun load(sessionId: String): List<Message> {
val fullHistory = storage.loadHistory(sessionId)
// Keep only last N messages
return fullHistory.takeLast(windowSize)
}
}
install(ChatMemory) {
chatHistoryProvider = SlidingWindowMemory(windowSize = 10)
}
Summarize Old Messages
class SummarizingMemory : ChatHistoryProvider {
override suspend fun load(sessionId: String): List<Message> {
val fullHistory = storage.loadHistory(sessionId)
if (fullHistory.size > 20) {
// Summarize old messages
val oldMessages = fullHistory.take(fullHistory.size - 10)
val summary = summarize(oldMessages)
val recentMessages = fullHistory.takeLast(10)
return listOf(Message.System(summary)) + recentMessages
}
return fullHistory
}
private suspend fun summarize(messages: List<Message>): String {
return llm.complete(
"Summarize this conversation in 2-3 sentences:\n" +
messages.joinToString("\n") { "${it.role}: ${it.content}" }
)
}
}
Token Counting
Monitor token usage:
import ai.koog.agents.features.tokenizer.Tokenizer
val agent = AIAgent("monitored") {
install(Tokenizer) {
// Track token usage
onTokenUsage { usage ->
logger.info {
"Tokens - Prompt: ${usage.promptTokens}, " +
"Completion: ${usage.completionTokens}, " +
"Total: ${usage.totalTokens}"
}
}
}
}
Parallel Execution
Parallel Nodes
Execute multiple operations concurrently:
val searchNode by node<Query, SearchResult>("search") { query ->
searchService.search(query)
}
val summarizeNode by node<Query, Summary>("summarize") { query ->
llm.complete("Summarize: $query")
}
val classifyNode by node<Query, Classification>("classify") { query ->
classifier.classify(query)
}
// Execute all nodes in parallel
val parallelResults by parallel<Query, Result>("parallel-processing") {
node(searchNode)
node(summarizeNode)
node(classifyNode)
// Merge results
merge { context ->
val search = context.results[0].nodeResult.output as SearchResult
val summary = context.results[1].nodeResult.output as Summary
val classification = context.results[2].nodeResult.output as Classification
Result(search, summary, classification)
}
}
nodeStart then parallelResults then nodeFinish
val agent = AIAgent("parallel-tools") {
// Enable parallel tool execution
agentConfig {
enableParallelToolExecution = true
}
tool(SearchTool)
tool(WeatherTool)
tool(NewsTool)
}
// LLM can request multiple tools simultaneously
// All tools execute in parallel
// Sequential: 3 operations × 2 seconds = 6 seconds
val result1 = operation1()
val result2 = operation2()
val result3 = operation3()
// Parallel: max(2, 2, 2) = 2 seconds
val (result1, result2, result3) = coroutineScope {
awaitAll(
async { operation1() },
async { operation2() },
async { operation3() }
)
}
Model Selection
Choose Appropriate Models
// Fast, cheap model for simple tasks
val simpleAgent = AIAgent("simple") {
withModel(GPT3_5_TURBO) // ~500ms, $0.0005/1K tokens
}
// Powerful model for complex reasoning
val complexAgent = AIAgent("complex") {
withModel(GPT4) // ~2s, $0.03/1K tokens
}
// Ultra-fast model for real-time responses
val realtimeAgent = AIAgent("realtime") {
withModel(CLAUDE_3_HAIKU) // ~300ms, $0.00025/1K tokens
}
Dynamic Model Selection
val adaptiveNode by node<Task, Result>("adaptive") { task ->
val model = when {
task.complexity > 8 -> GPT4
task.complexity > 5 -> GPT3_5_TURBO
else -> CLAUDE_3_HAIKU
}
llm.complete(task.description, model = model)
}
Model Routing
class ModelRouter {
suspend fun route(prompt: String): LLModel {
return when {
// Use fast model for simple queries
prompt.length < 100 -> CLAUDE_3_HAIKU
// Use balanced model for medium queries
prompt.length < 500 -> GPT3_5_TURBO
// Use powerful model for complex queries
else -> GPT4
}
}
}
Connection Pooling
HTTP Client Configuration
import io.ktor.client.*
import io.ktor.client.engine.cio.*
val httpClient = HttpClient(CIO) {
engine {
// Connection pooling
maxConnectionsCount = 100
endpoint {
maxConnectionsPerRoute = 20
keepAliveTime = 30_000
}
// Pipelining
pipelineMaxSize = 20
}
}
Reuse LLM Clients
// ✅ Good: Single client instance
val openAIClient = OpenAILLMClient(apiKey)
val agent1 = AIAgent("agent1") {
withPromptExecutor(PromptExecutor(openAIClient))
}
val agent2 = AIAgent("agent2") {
withPromptExecutor(PromptExecutor(openAIClient))
}
// ❌ Bad: New client for each agent
val agent3 = AIAgent("agent3") {
withPromptExecutor(PromptExecutor(
OpenAILLMClient(apiKey) // Creates new connection pool
))
}
Memory Management
Limit State Size
val agent = AIAgent("memory-efficient") {
// Clear old state regularly
interceptStrategyCompleted(this) { context ->
// Remove temporary keys
context.storage.removeAll { key ->
key.name.startsWith("temp_")
}
}
}
Use Weak References for Cache
import java.lang.ref.WeakReference
class WeakCache<K, V> {
private val cache = mutableMapOf<K, WeakReference<V>>()
fun get(key: K): V? {
return cache[key]?.get()
}
fun put(key: K, value: V) {
cache[key] = WeakReference(value)
}
}
Stream Large Responses
// Instead of loading entire response into memory
val response = llm.complete(longPrompt)
// Stream and process incrementally
llm.completeStreaming(longPrompt)
.collect { delta ->
process(delta)
// Delta is garbage collected after processing
}
Benchmarking
import kotlin.system.measureTimeMillis
class AgentBenchmark {
suspend fun benchmark(agent: AIAgent, iterations: Int = 100) {
val latencies = mutableListOf<Long>()
repeat(iterations) {
val latency = measureTimeMillis {
agent.execute("test input")
}
latencies.add(latency)
}
println("Results:")
println(" Mean: ${latencies.average().roundToInt()}ms")
println(" P50: ${latencies.percentile(50)}ms")
println(" P95: ${latencies.percentile(95)}ms")
println(" P99: ${latencies.percentile(99)}ms")
}
private fun List<Long>.percentile(p: Int): Long {
val sorted = sorted()
val index = (sorted.size * p / 100).coerceIn(0, sorted.size - 1)
return sorted[index]
}
}
Load Testing
import kotlinx.coroutines.*
suspend fun loadTest(agent: AIAgent, concurrency: Int = 10) {
val requests = 1000
val successCount = AtomicInteger(0)
val errorCount = AtomicInteger(0)
val startTime = Clock.System.now()
coroutineScope {
// Launch concurrent requests
val jobs = List(concurrency) {
async {
repeat(requests / concurrency) {
try {
agent.execute("test")
successCount.incrementAndGet()
} catch (e: Exception) {
errorCount.incrementAndGet()
}
}
}
}
jobs.awaitAll()
}
val duration = (Clock.System.now() - startTime).inWholeSeconds
val rps = requests / duration
println("Load Test Results:")
println(" Requests: $requests")
println(" Concurrency: $concurrency")
println(" Duration: ${duration}s")
println(" RPS: $rps")
println(" Success: ${successCount.get()}")
println(" Errors: ${errorCount.get()}")
}
Best Practices
1. Cache Aggressively
// Cache at multiple levels
- LLM response cache (CachedPromptExecutor)
- Tool result cache (per-tool caching)
- Intermediate computation cache (storage)
2. Minimize LLM Calls
// ❌ Bad: Multiple calls for simple task
val sentiment = llm.complete("What's the sentiment?")
val summary = llm.complete("Summarize this")
val keywords = llm.complete("Extract keywords")
// ✅ Good: Single call with structured output
val result = llm.complete<Analysis>(
"Analyze this text: sentiment, summary, keywords"
)
3. Use Streaming for Long Responses
// Better UX and lower memory usage
llm.completeStreaming(prompt).collect { delta ->
ui.appendText(delta.content)
}
4. Profile in Production
install(OpenTelemetry) {
// Track performance metrics
setVerbose(false)
setSampler(Sampler.traceIdRatioBased(0.1))
}
5. Set Appropriate Timeouts
val httpClient = HttpClient(CIO) {
install(HttpTimeout) {
requestTimeoutMillis = 30_000 // 30s for normal requests
connectTimeoutMillis = 5_000 // 5s for connection
}
}
Typical Latencies
| Operation | Target | Notes |
|---|
| Simple LLM call | < 1s | With fast model (Haiku, GPT-3.5) |
| Complex LLM call | < 5s | With powerful model (GPT-4, Claude) |
| Tool execution | < 500ms | For local/cached operations |
| Agent execution | < 10s | End-to-end for typical workflow |
| Streaming first token | < 500ms | Time to first response |
Resource Usage
| Metric | Target | Notes |
|---|
| Memory per agent | < 50MB | Without large caches |
| Cache memory | < 500MB | With aggressive caching |
| Concurrent agents | 100+ | Per server instance |
| RPS | 10-50 | Requests per second per instance |
install(Metrics) {
trackLatency = true
trackTokenUsage = true
trackCacheHits = true
collector { metrics ->
// Alert on slow requests
if (metrics.p95Latency > 5.seconds) {
alerting.warn("High P95 latency: ${metrics.p95Latency}")
}
// Alert on low cache hit rate
if (metrics.cacheHitRate < 0.5) {
alerting.warn("Low cache hit rate: ${metrics.cacheHitRate}")
}
}
}
Resources