A2A Protocol Overview
The A2A (Agent-to-Agent) protocol standardizes communication between agents with:- Messages — Simple text-based exchanges
- Tasks — Structured workflows with state management
- Artifacts — Deliverables produced by agents
- Streaming — Real-time event updates
Key Concepts
Context: A conversation or session between agentsTask: A unit of work with states (Submitted → Working → Completed)
Artifact: Results produced by agents (text, files, structured data)
Event: State updates emitted during task execution
Simple Message-Based Communication
The simplest A2A interaction uses direct message exchange:Server Side
Create an agent that responds to messages:Server.kt
import ai.koog.a2a.server.agent.AgentExecutor
import ai.koog.a2a.server.session.RequestContext
import ai.koog.a2a.server.session.SessionEventProcessor
import ai.koog.agents.a2a.server.feature.withA2AAgentServer
import ai.koog.agents.core.agent.AIAgent
class SimpleJokeAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
val agent = AIAgent(
promptExecutor = executor,
llmModel = GoogleModels.Gemini2_5Flash,
systemPrompt = "You are a funny assistant that tells jokes.",
toolRegistry = ToolRegistry.EMPTY
) {
install(A2AAgentServer) {
this.context = context
this.eventProcessor = eventProcessor
}
}
agent.run(context.params.message)
}
}
// Start server
suspend fun main() {
val server = A2AHttpServer(
port = 9998,
agents = mapOf(
"/joke-agent" to SimpleJokeAgentExecutor()
)
)
server.start()
println("Server running on port 9998")
server.join()
}
Client Side
Connect and send messages:Client.kt
import ai.koog.a2a.client.A2AClient
import ai.koog.a2a.client.UrlAgentCardResolver
import ai.koog.a2a.transport.client.jsonrpc.http.HttpJSONRPCClientTransport
import ai.koog.a2a.model.*
import kotlin.uuid.Uuid
suspend fun main() {
val transport = HttpJSONRPCClientTransport(
url = "http://localhost:9998/joke-agent"
)
val resolver = UrlAgentCardResolver(
baseUrl = "http://localhost:9998",
path = "/joke-agent/card"
)
val client = A2AClient(transport, resolver)
client.connect()
println("Connected to: ${client.cachedAgentCard().name}")
val message = Message(
messageId = Uuid.random().toString(),
role = Role.User,
parts = listOf(TextPart("Tell me a joke about Kotlin")),
contextId = "session-123"
)
val response = client.sendMessage(
Request(MessageSendParams(message = message))
)
println("Response: ${response.data}")
transport.close()
}
Task-Based Workflows
For complex interactions, use task-based workflows with state management:Server Implementation
JokeWriterAgentExecutor.kt
import ai.koog.a2a.model.*
import ai.koog.agents.a2a.core.A2AMessage
import ai.koog.agents.a2a.server.feature.A2AAgentServer
import ai.koog.agents.a2a.server.feature.withA2AAgentServer
class JokeWriterAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
val agent = jokeWriterAgent(promptExecutor, context, eventProcessor)
agent.run(context.params.message)
}
}
private fun jokeWriterAgent(
promptExecutor: PromptExecutor,
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
): GraphAIAgent<A2AMessage, Unit> {
val agentConfig = AIAgentConfig(
prompt = prompt("joke-generation") {
system("You are a funny assistant that generates jokes.")
},
model = GoogleModels.Gemini2_5Flash,
maxAgentIterations = 20
)
return GraphAIAgent(
inputType = typeOf<A2AMessage>(),
outputType = typeOf<Unit>(),
promptExecutor = promptExecutor,
strategy = jokeWriterStrategy(),
agentConfig = agentConfig,
toolRegistry = ToolRegistry.EMPTY,
) {
install(A2AAgentServer) {
this.context = context
this.eventProcessor = eventProcessor
}
}
}
Strategy with Task States
private fun jokeWriterStrategy() = strategy<A2AMessage, Unit>("joke-writer") {
// Load conversation history
val setupMessageContext by node<A2AMessage, A2AMessage> { userInput ->
val contextMessages = withA2AAgentServer {
context.messageStorage.getAll()
}
llm.writeSession {
appendPrompt {
messages(contextMessages.map { it.toKoogMessage() })
}
}
userInput
}
// Check for existing task
val setupTaskContext by node<A2AMessage, Task?> { userInput ->
val currentTask = withA2AAgentServer {
context.task?.id?.let { id ->
context.taskStorage.get(id, historyLength = null)
}
}
currentTask?.let { task ->
val taskMessages = (task.history.orEmpty() +
listOfNotNull(task.status.message) +
userInput).map { it.toKoogMessage() }
llm.writeSession {
appendPrompt {
user("Ongoing task conversation:")
messages(taskMessages)
}
}
}
withA2AAgentServer {
if (currentTask != null) {
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
taskId = currentTask.id,
contextId = currentTask.contextId,
status = TaskStatus(
state = TaskState.Working,
message = userInput
),
final = false
)
)
} else {
context.messageStorage.save(userInput)
}
}
currentTask
}
// Classify request
val classifyRequest by nodeLLMRequestStructured<JokeRequestClassification>()
// Create task
val createTask by node<JokeRequestClassification, Unit> {
val userInput = agentInput<A2AMessage>()
withA2AAgentServer {
eventProcessor.sendTaskEvent(
Task(
id = context.taskId,
contextId = context.contextId,
status = TaskStatus(
state = TaskState.Submitted,
message = userInput
)
)
)
}
}
// Generate joke
val generateJoke by node<JokeRequestClassification.Ready, Message.Assistant> { request ->
llm.writeSession {
appendPrompt {
user("Generate joke about: ${request.subject}")
}
requestLLMWithoutTools() as Message.Assistant
}
}
// Send artifact
val respondWithJoke by node<Message, Unit> { jokeMessage ->
withA2AAgentServer {
eventProcessor.sendTaskEvent(
TaskArtifactUpdateEvent(
taskId = context.taskId,
contextId = context.contextId,
artifact = Artifact(
artifactId = "joke",
parts = listOf(TextPart(jokeMessage.content))
)
)
)
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
taskId = context.taskId,
contextId = context.contextId,
status = TaskStatus(state = TaskState.Completed),
final = true
)
)
}
}
// Define flow
nodeStart then setupMessageContext then setupTaskContext
edge(setupTaskContext forwardTo createTask onCondition { it == null })
edge(createTask forwardTo classifyRequest)
edge(classifyRequest forwardTo generateJoke)
edge(generateJoke forwardTo respondWithJoke)
edge(respondWithJoke forwardTo nodeFinish)
}
Client with Streaming
Client.kt
suspend fun main() {
val transport = HttpJSONRPCClientTransport(
url = "http://localhost:9999/joke-agent"
)
val client = A2AClient(transport, agentCardResolver)
client.connect()
var currentTaskId: String? = null
val artifacts = mutableMapOf<String, Artifact>()
while (true) {
println("Request (/q to quit):")
val request = readln()
if (request == "/q") break
val message = Message(
messageId = Uuid.random().toString(),
role = Role.User,
parts = listOf(TextPart(request)),
contextId = "session-123",
taskId = currentTaskId
)
client.sendMessageStreaming(
Request(MessageSendParams(message))
).collect { response ->
when (val event = response.data) {
is Task -> {
currentTaskId = event.id
println("Task created: ${event.id}")
}
is TaskStatusUpdateEvent -> {
println("Task state: ${event.status.state}")
when (event.status.state) {
TaskState.InputRequired -> {
val question = event.status.message?.parts
?.filterIsInstance<TextPart>()
?.joinToString("\n") { it.text }
println("Question: $question")
}
TaskState.Completed -> {
if (event.final) {
currentTaskId = null
artifacts.clear()
}
}
else -> {}
}
}
is TaskArtifactUpdateEvent -> {
artifacts[event.artifact.artifactId] = event.artifact
val content = event.artifact.parts
.filterIsInstance<TextPart>()
.joinToString("\n") { it.text }
println("Artifact [${event.artifact.artifactId}]: $content")
}
is Message -> {
val text = event.parts
.filterIsInstance<TextPart>()
.joinToString("\n") { it.text }
println("Message: $text")
}
}
}
}
transport.close()
}
Task State Flow
Tasks progress through well-defined states:Submitted → Working → [InputRequired] → Completed
↘ ↗
Failed/Canceled/Rejected
State Transitions
Submitted: Task created and queuedWorking: Agent actively processing
InputRequired: Waiting for user clarification
Completed: Task finished successfully
Failed: Task encountered an error
Canceled: User canceled the task
Rejected: Agent declined the task
Artifacts
Artifacts are results produced by agents:// Send an artifact
eventProcessor.sendTaskEvent(
TaskArtifactUpdateEvent(
taskId = context.taskId,
contextId = context.contextId,
artifact = Artifact(
artifactId = "generated-code",
parts = listOf(
TextPart(generatedCode)
)
)
)
)
// Append to existing artifact
eventProcessor.sendTaskEvent(
TaskArtifactUpdateEvent(
taskId = context.taskId,
contextId = context.contextId,
artifact = Artifact(
artifactId = "log",
parts = listOf(TextPart("New log entry\n"))
),
append = true
)
)
Interactive Clarification
Agents can request additional information:val askMoreInfo by node<NeedsClarification, Unit> { clarification ->
withA2AAgentServer {
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
taskId = context.taskId,
contextId = context.contextId,
status = TaskStatus(
state = TaskState.InputRequired,
message = A2AMessage(
role = Role.Agent,
parts = listOf(TextPart(clarification.question)),
messageId = Uuid.random().toString(),
taskId = context.taskId,
contextId = context.contextId
)
),
final = true
)
)
}
}
Multi-Agent Coordination
Coordinate multiple specialized agents:// Orchestrator agent
val orchestrator = AIAgent(
promptExecutor = executor,
strategy = orchestratorStrategy(),
toolRegistry = ToolRegistry {
tool(DelegateToResearchAgent)
tool(DelegateToWritingAgent)
tool(DelegateToReviewAgent)
}
)
// Delegate to specialized agent
@Tool
@LLMDescription("Delegate research to specialized research agent")
suspend fun delegateToResearchAgent(
@LLMDescription("Research topic")
topic: String
): String {
val client = createA2AClient("http://localhost:9000/research-agent")
val response = client.sendMessage(
Request(MessageSendParams(
message = Message(
messageId = Uuid.random().toString(),
role = Role.User,
parts = listOf(TextPart(topic)),
contextId = contextId
)
))
)
return extractResult(response)
}
Best Practices
Handle All States
Always handle task state transitions:when (event.status.state) {
TaskState.Submitted -> handleSubmitted()
TaskState.Working -> handleWorking()
TaskState.InputRequired -> handleInputRequired()
TaskState.Completed -> handleCompleted()
TaskState.Failed -> handleFailed()
TaskState.Canceled -> handleCanceled()
TaskState.Rejected -> handleRejected()
}
Message Storage
Persist messages for context continuity:val contextMessages = withA2AAgentServer {
context.messageStorage.getAll()
}
llm.writeSession {
appendPrompt {
messages(contextMessages.map { it.toKoogMessage() })
}
}
Clear Artifacts
Reset artifacts when tasks complete:if (event.status.state == TaskState.Completed && event.final) {
currentTaskId = null
artifacts.clear()
}
Next Steps
- Graph Workflows — Build complex agent workflows
- Streaming — Handle real-time updates
- Testing Agents — Test multi-agent systems