Overview
AIAgentPipeline is the central mechanism for extending and customizing agent behavior. It provides interception points for various agent lifecycle events and manages feature installation, configuration, and execution.
The pipeline allows features to hook into different stages of agent execution, from agent creation and strategy execution to LLM calls and tool invocations.
Class Definition
public expect abstract class AIAgentPipeline(
agentConfig: AIAgentConfig,
clock: Clock
) : AIAgentPipelineAPI
Constructor Parameters
Configuration for the AI agent
Clock instance for time-related operations
Properties
Clock instance representing the current system time
The agent configuration settings
Feature Management
feature
Retrieves an installed feature implementation.
public fun <TFeature : Any> feature(
featureClass: KClass<TFeature>,
feature: AIAgentFeature<*, TFeature>
): TFeature?
The class of the feature implementation
feature
AIAgentFeature<*, TFeature>
required
The feature to retrieve
The feature instance, or null if not installed
install
Installs a feature into the pipeline.
public fun <TConfig : FeatureConfig, TFeatureImpl : Any> install(
featureKey: AIAgentStorageKey<TFeatureImpl>,
featureConfig: TConfig,
featureImpl: TFeatureImpl
)
featureKey
AIAgentStorageKey<TFeatureImpl>
required
Unique key identifying the feature
Configuration for the feature
The feature implementation instance
uninstall
Uninstalls a feature from the pipeline.
public suspend fun uninstall(
featureKey: AIAgentStorageKey<*>
)
featureKey
AIAgentStorageKey<*>
required
Key identifying the feature to uninstall
Agent Lifecycle Interceptors
interceptAgentStarting
Intercepts before an agent starts execution.
public fun interceptAgentStarting(
feature: AIAgentFeature<*, *>,
handle: suspend (AgentStartingContext) -> Unit
)
feature
AIAgentFeature<*, *>
required
The feature registering the interceptor
handle
suspend (AgentStartingContext) -> Unit
required
Handler function called when agent starts
interceptAgentCompleted
Intercepts after an agent completes execution.
public fun interceptAgentCompleted(
feature: AIAgentFeature<*, *>,
handle: suspend (AgentCompletedContext) -> Unit
)
interceptAgentExecutionFailed
Intercepts when agent execution fails.
public fun interceptAgentExecutionFailed(
feature: AIAgentFeature<*, *>,
handle: suspend (AgentExecutionFailedContext) -> Unit
)
interceptAgentClosing
Intercepts before an agent closes.
public fun interceptAgentClosing(
feature: AIAgentFeature<*, *>,
handle: suspend (AgentClosingContext) -> Unit
)
interceptEnvironmentCreated
Intercepts and transforms agent environment creation.
public fun interceptEnvironmentCreated(
feature: AIAgentFeature<*, *>,
handle: suspend (
AgentEnvironmentTransformingContext,
AIAgentEnvironment
) -> AIAgentEnvironment
)
handle
suspend (AgentEnvironmentTransformingContext, AIAgentEnvironment) -> AIAgentEnvironment
required
Transformer function that can modify or wrap the environment
Strategy Lifecycle Interceptors
interceptStrategyStarting
Intercepts when a strategy begins execution.
public fun interceptStrategyStarting(
feature: AIAgentFeature<*, *>,
handle: suspend (StrategyStartingContext) -> Unit
)
interceptStrategyCompleted
Intercepts when a strategy completes.
public fun interceptStrategyCompleted(
feature: AIAgentFeature<*, *>,
handle: suspend (StrategyCompletedContext) -> Unit
)
LLM Call Interceptors
interceptLLMCallStarting
Intercepts before an LLM call is made.
public fun interceptLLMCallStarting(
feature: AIAgentFeature<*, *>,
handle: suspend (LLMCallStartingContext) -> Unit
)
You can modify the prompt before execution by updating context.context.llm.prompt in the handler.
interceptLLMCallCompleted
Intercepts after an LLM call completes.
public fun interceptLLMCallCompleted(
feature: AIAgentFeature<*, *>,
handle: suspend (LLMCallCompletedContext) -> Unit
)
Streaming Interceptors
interceptLLMStreamingStarting
Intercepts before streaming begins.
public fun interceptLLMStreamingStarting(
feature: AIAgentFeature<*, *>,
handle: suspend (LLMStreamingStartingContext) -> Unit
)
interceptLLMStreamingFrameReceived
Intercepts each stream frame as it’s received.
public fun interceptLLMStreamingFrameReceived(
feature: AIAgentFeature<*, *>,
handle: suspend (LLMStreamingFrameReceivedContext) -> Unit
)
interceptLLMStreamingFailed
Intercepts if streaming fails.
public fun interceptLLMStreamingFailed(
feature: AIAgentFeature<*, *>,
handle: suspend (LLMStreamingFailedContext) -> Unit
)
interceptLLMStreamingCompleted
Intercepts after streaming completes.
public fun interceptLLMStreamingCompleted(
feature: AIAgentFeature<*, *>,
handle: suspend (LLMStreamingCompletedContext) -> Unit
)
Intercepts when a tool is called.
public fun interceptToolCallStarting(
feature: AIAgentFeature<*, *>,
handle: suspend (ToolCallStartingContext) -> Unit
)
Intercepts after a tool call completes.
public fun interceptToolCallCompleted(
feature: AIAgentFeature<*, *>,
handle: suspend (ToolCallCompletedContext) -> Unit
)
Intercepts when a tool call fails.
public fun interceptToolCallFailed(
feature: AIAgentFeature<*, *>,
handle: suspend (ToolCallFailedContext) -> Unit
)
Intercepts when tool validation fails.
public fun interceptToolValidationFailed(
feature: AIAgentFeature<*, *>,
handle: suspend (ToolValidationFailedContext) -> Unit
)
Usage Examples
Creating a Feature with Pipeline Interceptors
object MyFeature : AIAgentGraphFeature<MyFeatureConfig, MyFeatureImpl> {
override val key = AIAgentStorageKey<MyFeatureImpl>("my-feature")
override fun createInitialConfig() = MyFeatureConfig()
override fun install(
config: MyFeatureConfig,
pipeline: AIAgentGraphPipeline
): MyFeatureImpl {
val impl = MyFeatureImpl(config)
// Intercept agent lifecycle
pipeline.interceptAgentStarting(this) { context ->
println("Agent ${context.agent.id} starting")
}
pipeline.interceptAgentCompleted(this) { context ->
println("Agent completed with: ${context.result}")
}
// Intercept LLM calls
pipeline.interceptLLMCallStarting(this) { context ->
println("Calling LLM with model: ${context.model.id}")
}
pipeline.interceptLLMCallCompleted(this) { context ->
println("LLM responded: ${context.responses.first().content}")
}
// Intercept tool calls
pipeline.interceptToolCallStarting(this) { context ->
println("Calling tool: ${context.toolName}")
}
return impl
}
}
Modifying Prompts Before LLM Calls
pipeline.interceptLLMCallStarting(this) { context ->
// Add system prompt enhancement
context.context.llm.prompt = Prompt {
system("You are a helpful assistant with enhanced capabilities.")
messages(context.prompt.messages)
}
}
pipeline.interceptEnvironmentCreated(this) { context, environment ->
// Wrap environment with logging
object : AIAgentEnvironment by environment {
override suspend fun executeTool(
toolCall: Message.Tool.Call
): ReceivedToolResult {
println("Executing tool: ${toolCall.name}")
val result = environment.executeTool(toolCall)
println("Tool result: ${result.content}")
return result
}
}
}
Collecting Execution Statistics
class StatisticsFeatureImpl {
var llmCallCount = 0
var toolCallCount = 0
var totalTokens = 0
}
object StatisticsFeature : AIAgentGraphFeature<FeatureConfig, StatisticsFeatureImpl> {
override val key = AIAgentStorageKey<StatisticsFeatureImpl>("statistics")
override fun createInitialConfig() = object : FeatureConfig {}
override fun install(
config: FeatureConfig,
pipeline: AIAgentGraphPipeline
): StatisticsFeatureImpl {
val impl = StatisticsFeatureImpl()
pipeline.interceptLLMCallCompleted(this) { context ->
impl.llmCallCount++
context.responses.firstOrNull()?.usage?.let { usage ->
impl.totalTokens += usage.inputTokens + usage.outputTokens
}
}
pipeline.interceptToolCallCompleted(this) { context ->
impl.toolCallCount++
}
return impl
}
}
Error Handling Feature
object ErrorHandlingFeature : AIAgentGraphFeature<ErrorHandlingConfig, ErrorHandlingImpl> {
override val key = AIAgentStorageKey<ErrorHandlingImpl>("error-handling")
override fun createInitialConfig() = ErrorHandlingConfig()
override fun install(
config: ErrorHandlingConfig,
pipeline: AIAgentGraphPipeline
): ErrorHandlingImpl {
val impl = ErrorHandlingImpl(config)
pipeline.interceptAgentExecutionFailed(this) { context ->
impl.logError(context.throwable)
if (config.sendNotifications) {
impl.notifyError(context.agentId, context.throwable)
}
}
pipeline.interceptToolCallFailed(this) { context ->
impl.logToolError(
toolName = context.toolName,
error = context.error,
message = context.message
)
}
return impl
}
}
Streaming Progress Tracker
object StreamingProgressFeature : AIAgentGraphFeature<ProgressConfig, ProgressImpl> {
override val key = AIAgentStorageKey<ProgressImpl>("streaming-progress")
override fun createInitialConfig() = ProgressConfig()
override fun install(
config: ProgressConfig,
pipeline: AIAgentGraphPipeline
): ProgressImpl {
val impl = ProgressImpl()
pipeline.interceptLLMStreamingStarting(this) { context ->
impl.startProgress(context.runId)
}
pipeline.interceptLLMStreamingFrameReceived(this) { context ->
when (val frame = context.streamFrame) {
is StreamFrame.Content -> {
impl.updateProgress(context.runId, frame.text)
}
else -> { /* handle other frame types */ }
}
}
pipeline.interceptLLMStreamingCompleted(this) { context ->
impl.completeProgress(context.runId)
}
pipeline.interceptLLMStreamingFailed(this) { context ->
impl.failProgress(context.runId, context.throwable)
}
return impl
}
}
Specialized Pipelines
AIAgentGraphPipeline
For graph-based agents:
public abstract class AIAgentGraphPipeline(
agentConfig: AIAgentConfig,
clock: Clock
) : AIAgentPipeline(agentConfig, clock)
AIAgentFunctionalPipeline
For functional agents:
public abstract class AIAgentFunctionalPipeline(
agentConfig: AIAgentConfig,
clock: Clock
) : AIAgentPipeline(agentConfig, clock)
AIAgentPlannerPipeline
For planner agents:
public abstract class AIAgentPlannerPipeline(
agentConfig: AIAgentConfig,
clock: Clock
) : AIAgentPipeline(agentConfig, clock)
Event Context Types
Each interceptor provides a context object with relevant information:
AgentStartingContext - Agent and execution info
AgentCompletedContext - Agent ID, run ID, result
AgentExecutionFailedContext - Agent ID, run ID, exception
AgentClosingContext - Agent ID
StrategyStartingContext - Strategy instance, context
StrategyCompletedContext - Strategy, result, result type
LLMCallStartingContext - Prompt, model, tools, context
LLMCallCompletedContext - Prompt, model, responses, moderation
ToolCallStartingContext - Tool name, description, args
ToolCallCompletedContext - Tool name, args, result
ToolCallFailedContext - Tool name, args, error, message
LLMStreamingStartingContext - Prompt, model, tools
LLMStreamingFrameReceivedContext - Prompt, model, frame
LLMStreamingCompletedContext - Prompt, model, tools
LLMStreamingFailedContext - Prompt, model, throwable
Best Practices
Pipeline Usage
- Register interceptors during feature installation
- Keep interceptor handlers lightweight and fast
- Use interceptors for cross-cutting concerns
- Don’t block on I/O in interceptors unless necessary
- Consider interceptor execution order when multiple features are installed
Performance Considerations
- Interceptors run synchronously in the execution path
- Heavy processing should be offloaded to background coroutines
- Avoid modifying shared state without synchronization
- Be careful with recursive interceptor patterns
- Test feature interactions thoroughly
Source Reference
Defined in: agents-core/src/commonMain/kotlin/ai/koog/agents/core/feature/pipeline/AIAgentPipeline.kt