Skip to main content

Overview

AIAgentPipeline is the central mechanism for extending and customizing agent behavior. It provides interception points for various agent lifecycle events and manages feature installation, configuration, and execution.
The pipeline allows features to hook into different stages of agent execution, from agent creation and strategy execution to LLM calls and tool invocations.

Class Definition

public expect abstract class AIAgentPipeline(
    agentConfig: AIAgentConfig,
    clock: Clock
) : AIAgentPipelineAPI

Constructor Parameters

agentConfig
AIAgentConfig
required
Configuration for the AI agent
clock
Clock
required
Clock instance for time-related operations

Properties

clock
Clock
Clock instance representing the current system time
config
AIAgentConfig
The agent configuration settings

Feature Management

feature

Retrieves an installed feature implementation.
public fun <TFeature : Any> feature(
    featureClass: KClass<TFeature>,
    feature: AIAgentFeature<*, TFeature>
): TFeature?
featureClass
KClass<TFeature>
required
The class of the feature implementation
feature
AIAgentFeature<*, TFeature>
required
The feature to retrieve
return
TFeature?
The feature instance, or null if not installed

install

Installs a feature into the pipeline.
public fun <TConfig : FeatureConfig, TFeatureImpl : Any> install(
    featureKey: AIAgentStorageKey<TFeatureImpl>,
    featureConfig: TConfig,
    featureImpl: TFeatureImpl
)
featureKey
AIAgentStorageKey<TFeatureImpl>
required
Unique key identifying the feature
featureConfig
TConfig
required
Configuration for the feature
featureImpl
TFeatureImpl
required
The feature implementation instance

uninstall

Uninstalls a feature from the pipeline.
public suspend fun uninstall(
    featureKey: AIAgentStorageKey<*>
)
featureKey
AIAgentStorageKey<*>
required
Key identifying the feature to uninstall

Agent Lifecycle Interceptors

interceptAgentStarting

Intercepts before an agent starts execution.
public fun interceptAgentStarting(
    feature: AIAgentFeature<*, *>,
    handle: suspend (AgentStartingContext) -> Unit
)
feature
AIAgentFeature<*, *>
required
The feature registering the interceptor
handle
suspend (AgentStartingContext) -> Unit
required
Handler function called when agent starts

interceptAgentCompleted

Intercepts after an agent completes execution.
public fun interceptAgentCompleted(
    feature: AIAgentFeature<*, *>,
    handle: suspend (AgentCompletedContext) -> Unit
)

interceptAgentExecutionFailed

Intercepts when agent execution fails.
public fun interceptAgentExecutionFailed(
    feature: AIAgentFeature<*, *>,
    handle: suspend (AgentExecutionFailedContext) -> Unit
)

interceptAgentClosing

Intercepts before an agent closes.
public fun interceptAgentClosing(
    feature: AIAgentFeature<*, *>,
    handle: suspend (AgentClosingContext) -> Unit
)

interceptEnvironmentCreated

Intercepts and transforms agent environment creation.
public fun interceptEnvironmentCreated(
    feature: AIAgentFeature<*, *>,
    handle: suspend (
        AgentEnvironmentTransformingContext,
        AIAgentEnvironment
    ) -> AIAgentEnvironment
)
handle
suspend (AgentEnvironmentTransformingContext, AIAgentEnvironment) -> AIAgentEnvironment
required
Transformer function that can modify or wrap the environment

Strategy Lifecycle Interceptors

interceptStrategyStarting

Intercepts when a strategy begins execution.
public fun interceptStrategyStarting(
    feature: AIAgentFeature<*, *>,
    handle: suspend (StrategyStartingContext) -> Unit
)

interceptStrategyCompleted

Intercepts when a strategy completes.
public fun interceptStrategyCompleted(
    feature: AIAgentFeature<*, *>,
    handle: suspend (StrategyCompletedContext) -> Unit
)

LLM Call Interceptors

interceptLLMCallStarting

Intercepts before an LLM call is made.
public fun interceptLLMCallStarting(
    feature: AIAgentFeature<*, *>,
    handle: suspend (LLMCallStartingContext) -> Unit
)
You can modify the prompt before execution by updating context.context.llm.prompt in the handler.

interceptLLMCallCompleted

Intercepts after an LLM call completes.
public fun interceptLLMCallCompleted(
    feature: AIAgentFeature<*, *>,
    handle: suspend (LLMCallCompletedContext) -> Unit
)

Streaming Interceptors

interceptLLMStreamingStarting

Intercepts before streaming begins.
public fun interceptLLMStreamingStarting(
    feature: AIAgentFeature<*, *>,
    handle: suspend (LLMStreamingStartingContext) -> Unit
)

interceptLLMStreamingFrameReceived

Intercepts each stream frame as it’s received.
public fun interceptLLMStreamingFrameReceived(
    feature: AIAgentFeature<*, *>,
    handle: suspend (LLMStreamingFrameReceivedContext) -> Unit
)

interceptLLMStreamingFailed

Intercepts if streaming fails.
public fun interceptLLMStreamingFailed(
    feature: AIAgentFeature<*, *>,
    handle: suspend (LLMStreamingFailedContext) -> Unit
)

interceptLLMStreamingCompleted

Intercepts after streaming completes.
public fun interceptLLMStreamingCompleted(
    feature: AIAgentFeature<*, *>,
    handle: suspend (LLMStreamingCompletedContext) -> Unit
)

Tool Call Interceptors

interceptToolCallStarting

Intercepts when a tool is called.
public fun interceptToolCallStarting(
    feature: AIAgentFeature<*, *>,
    handle: suspend (ToolCallStartingContext) -> Unit
)

interceptToolCallCompleted

Intercepts after a tool call completes.
public fun interceptToolCallCompleted(
    feature: AIAgentFeature<*, *>,
    handle: suspend (ToolCallCompletedContext) -> Unit
)

interceptToolCallFailed

Intercepts when a tool call fails.
public fun interceptToolCallFailed(
    feature: AIAgentFeature<*, *>,
    handle: suspend (ToolCallFailedContext) -> Unit
)

interceptToolValidationFailed

Intercepts when tool validation fails.
public fun interceptToolValidationFailed(
    feature: AIAgentFeature<*, *>,
    handle: suspend (ToolValidationFailedContext) -> Unit
)

Usage Examples

Creating a Feature with Pipeline Interceptors

object MyFeature : AIAgentGraphFeature<MyFeatureConfig, MyFeatureImpl> {
    override val key = AIAgentStorageKey<MyFeatureImpl>("my-feature")
    
    override fun createInitialConfig() = MyFeatureConfig()
    
    override fun install(
        config: MyFeatureConfig,
        pipeline: AIAgentGraphPipeline
    ): MyFeatureImpl {
        val impl = MyFeatureImpl(config)
        
        // Intercept agent lifecycle
        pipeline.interceptAgentStarting(this) { context ->
            println("Agent ${context.agent.id} starting")
        }
        
        pipeline.interceptAgentCompleted(this) { context ->
            println("Agent completed with: ${context.result}")
        }
        
        // Intercept LLM calls
        pipeline.interceptLLMCallStarting(this) { context ->
            println("Calling LLM with model: ${context.model.id}")
        }
        
        pipeline.interceptLLMCallCompleted(this) { context ->
            println("LLM responded: ${context.responses.first().content}")
        }
        
        // Intercept tool calls
        pipeline.interceptToolCallStarting(this) { context ->
            println("Calling tool: ${context.toolName}")
        }
        
        return impl
    }
}

Modifying Prompts Before LLM Calls

pipeline.interceptLLMCallStarting(this) { context ->
    // Add system prompt enhancement
    context.context.llm.prompt = Prompt {
        system("You are a helpful assistant with enhanced capabilities.")
        messages(context.prompt.messages)
    }
}

Environment Transformation

pipeline.interceptEnvironmentCreated(this) { context, environment ->
    // Wrap environment with logging
    object : AIAgentEnvironment by environment {
        override suspend fun executeTool(
            toolCall: Message.Tool.Call
        ): ReceivedToolResult {
            println("Executing tool: ${toolCall.name}")
            val result = environment.executeTool(toolCall)
            println("Tool result: ${result.content}")
            return result
        }
    }
}

Collecting Execution Statistics

class StatisticsFeatureImpl {
    var llmCallCount = 0
    var toolCallCount = 0
    var totalTokens = 0
}

object StatisticsFeature : AIAgentGraphFeature<FeatureConfig, StatisticsFeatureImpl> {
    override val key = AIAgentStorageKey<StatisticsFeatureImpl>("statistics")
    
    override fun createInitialConfig() = object : FeatureConfig {}
    
    override fun install(
        config: FeatureConfig,
        pipeline: AIAgentGraphPipeline
    ): StatisticsFeatureImpl {
        val impl = StatisticsFeatureImpl()
        
        pipeline.interceptLLMCallCompleted(this) { context ->
            impl.llmCallCount++
            context.responses.firstOrNull()?.usage?.let { usage ->
                impl.totalTokens += usage.inputTokens + usage.outputTokens
            }
        }
        
        pipeline.interceptToolCallCompleted(this) { context ->
            impl.toolCallCount++
        }
        
        return impl
    }
}

Error Handling Feature

object ErrorHandlingFeature : AIAgentGraphFeature<ErrorHandlingConfig, ErrorHandlingImpl> {
    override val key = AIAgentStorageKey<ErrorHandlingImpl>("error-handling")
    
    override fun createInitialConfig() = ErrorHandlingConfig()
    
    override fun install(
        config: ErrorHandlingConfig,
        pipeline: AIAgentGraphPipeline
    ): ErrorHandlingImpl {
        val impl = ErrorHandlingImpl(config)
        
        pipeline.interceptAgentExecutionFailed(this) { context ->
            impl.logError(context.throwable)
            
            if (config.sendNotifications) {
                impl.notifyError(context.agentId, context.throwable)
            }
        }
        
        pipeline.interceptToolCallFailed(this) { context ->
            impl.logToolError(
                toolName = context.toolName,
                error = context.error,
                message = context.message
            )
        }
        
        return impl
    }
}

Streaming Progress Tracker

object StreamingProgressFeature : AIAgentGraphFeature<ProgressConfig, ProgressImpl> {
    override val key = AIAgentStorageKey<ProgressImpl>("streaming-progress")
    
    override fun createInitialConfig() = ProgressConfig()
    
    override fun install(
        config: ProgressConfig,
        pipeline: AIAgentGraphPipeline
    ): ProgressImpl {
        val impl = ProgressImpl()
        
        pipeline.interceptLLMStreamingStarting(this) { context ->
            impl.startProgress(context.runId)
        }
        
        pipeline.interceptLLMStreamingFrameReceived(this) { context ->
            when (val frame = context.streamFrame) {
                is StreamFrame.Content -> {
                    impl.updateProgress(context.runId, frame.text)
                }
                else -> { /* handle other frame types */ }
            }
        }
        
        pipeline.interceptLLMStreamingCompleted(this) { context ->
            impl.completeProgress(context.runId)
        }
        
        pipeline.interceptLLMStreamingFailed(this) { context ->
            impl.failProgress(context.runId, context.throwable)
        }
        
        return impl
    }
}

Specialized Pipelines

AIAgentGraphPipeline

For graph-based agents:
public abstract class AIAgentGraphPipeline(
    agentConfig: AIAgentConfig,
    clock: Clock
) : AIAgentPipeline(agentConfig, clock)

AIAgentFunctionalPipeline

For functional agents:
public abstract class AIAgentFunctionalPipeline(
    agentConfig: AIAgentConfig,
    clock: Clock
) : AIAgentPipeline(agentConfig, clock)

AIAgentPlannerPipeline

For planner agents:
public abstract class AIAgentPlannerPipeline(
    agentConfig: AIAgentConfig,
    clock: Clock
) : AIAgentPipeline(agentConfig, clock)

Event Context Types

Each interceptor provides a context object with relevant information:
  • AgentStartingContext - Agent and execution info
  • AgentCompletedContext - Agent ID, run ID, result
  • AgentExecutionFailedContext - Agent ID, run ID, exception
  • AgentClosingContext - Agent ID
  • StrategyStartingContext - Strategy instance, context
  • StrategyCompletedContext - Strategy, result, result type
  • LLMCallStartingContext - Prompt, model, tools, context
  • LLMCallCompletedContext - Prompt, model, responses, moderation
  • ToolCallStartingContext - Tool name, description, args
  • ToolCallCompletedContext - Tool name, args, result
  • ToolCallFailedContext - Tool name, args, error, message
  • LLMStreamingStartingContext - Prompt, model, tools
  • LLMStreamingFrameReceivedContext - Prompt, model, frame
  • LLMStreamingCompletedContext - Prompt, model, tools
  • LLMStreamingFailedContext - Prompt, model, throwable

Best Practices

Pipeline Usage
  • Register interceptors during feature installation
  • Keep interceptor handlers lightweight and fast
  • Use interceptors for cross-cutting concerns
  • Don’t block on I/O in interceptors unless necessary
  • Consider interceptor execution order when multiple features are installed
Performance Considerations
  • Interceptors run synchronously in the execution path
  • Heavy processing should be offloaded to background coroutines
  • Avoid modifying shared state without synchronization
  • Be careful with recursive interceptor patterns
  • Test feature interactions thoroughly

Source Reference

Defined in: agents-core/src/commonMain/kotlin/ai/koog/agents/core/feature/pipeline/AIAgentPipeline.kt

Build docs developers (and LLMs) love