Overview
The Aspire.Confluent.Kafka component registers IProducer<TKey, TValue> and IConsumer<TKey, TValue> instances in your dependency injection container for producing and consuming messages with Apache Kafka. It automatically enables health checks, logging, and metrics.
Installation
Install the component using the .NET CLI:
dotnet add package Aspire.Confluent.Kafka
Usage
Register a producer
In your service’s Program.cs file, call the AddKafkaProducer extension method with the key and value types:
builder . AddKafkaProducer < string , string >( "messaging" );
Inject and use the producer
Retrieve the IProducer<TKey, TValue> instance using dependency injection:
public class OrderService : BackgroundService
{
private readonly IProducer < string , string > _producer ;
private readonly ILogger < OrderService > _logger ;
public OrderService (
IProducer < string , string > producer ,
ILogger < OrderService > logger )
{
_producer = producer ;
_logger = logger ;
}
protected override async Task ExecuteAsync ( CancellationToken stoppingToken )
{
using var timer = new PeriodicTimer ( TimeSpan . FromSeconds ( 5 ));
long orderNumber = 0 ;
while ( await timer . WaitForNextTickAsync ( stoppingToken ))
{
var message = new Message < string , string >
{
Key = Guid . NewGuid (). ToString (),
Value = $"Order # { orderNumber ++ } "
};
try
{
var result = await _producer . ProduceAsync ( "orders" , message , stoppingToken );
_logger . LogInformation (
"Produced message to {Topic} partition {Partition} at offset {Offset}" ,
result . Topic , result . Partition , result . Offset );
}
catch ( ProduceException < string , string > ex )
{
_logger . LogError ( ex , "Failed to produce message" );
}
}
}
}
Register a consumer
Register a consumer with key and value types:
builder . AddKafkaConsumer < string , string >( "messaging" , consumerBuilder =>
{
consumerBuilder . Config . GroupId = "order-processing-group" ;
consumerBuilder . Config . AutoOffsetReset = AutoOffsetReset . Earliest ;
});
Consume messages
public class OrderProcessor : BackgroundService
{
private readonly IConsumer < string , string > _consumer ;
private readonly ILogger < OrderProcessor > _logger ;
public OrderProcessor (
IConsumer < string , string > consumer ,
ILogger < OrderProcessor > logger )
{
_consumer = consumer ;
_logger = logger ;
}
protected override Task ExecuteAsync ( CancellationToken stoppingToken )
{
_consumer . Subscribe ( "orders" );
try
{
while ( ! stoppingToken . IsCancellationRequested )
{
var consumeResult = _consumer . Consume ( stoppingToken );
_logger . LogInformation (
"Consumed message '{Value}' from topic {Topic} partition {Partition} at offset {Offset}" ,
consumeResult . Message . Value ,
consumeResult . Topic ,
consumeResult . Partition ,
consumeResult . Offset );
// Process the message
ProcessOrder ( consumeResult . Message . Value );
// Commit the offset
_consumer . Commit ( consumeResult );
}
}
catch ( OperationCanceledException )
{
_consumer . Close ();
}
return Task . CompletedTask ;
}
private void ProcessOrder ( string order )
{
// Process the order
}
}
Configuration
The component provides multiple configuration options based on your project requirements.
Connection string
Provide a connection string (bootstrap servers) in your appsettings.json:
{
"ConnectionStrings" : {
"messaging" : "localhost:9092"
}
}
Then register the component using the connection name:
builder . AddKafkaProducer < string , string >( "messaging" );
The connection string sets the BootstrapServers property. For more information, see the BootstrapServers documentation .
Configuration providers
Configure producer settings using the Aspire:Confluent:Kafka:Producer configuration section:
{
"Aspire" : {
"Confluent" : {
"Kafka" : {
"Producer" : {
"DisableHealthChecks" : false ,
"Config" : {
"Acks" : "All"
}
}
}
}
}
}
For consumers, use the Aspire:Confluent:Kafka:Consumer section:
{
"Aspire" : {
"Confluent" : {
"Kafka" : {
"Consumer" : {
"DisableHealthChecks" : false ,
"Config" : {
"GroupId" : "my-consumer-group" ,
"AutoOffsetReset" : "Earliest"
}
}
}
}
}
}
Consumers require the GroupId property to be set to track consumed message offsets.
Inline configuration
Configure settings directly in code using a delegate:
builder . AddKafkaProducer < string , string >( "messaging" , settings =>
{
settings . DisableHealthChecks = true ;
});
You can also configure the producer or consumer builder:
builder . AddKafkaProducer < string , MyMessage >( "messaging" ,
producerBuilder =>
{
producerBuilder . SetValueSerializer ( new MyMessageSerializer ());
});
builder . AddKafkaConsumer < string , MyMessage >( "messaging" ,
consumerBuilder =>
{
consumerBuilder . SetValueDeserializer ( new MyMessageDeserializer ());
consumerBuilder . Config . GroupId = "my-group" ;
});
AppHost integration
In your AppHost project, install the Kafka hosting package:
dotnet add package Aspire.Hosting.Kafka
Register a Kafka broker in your AppHost’s Program.cs:
var messaging = builder . AddKafka ( "messaging" );
var myService = builder . AddProject < Projects . MyService >()
. WithReference ( messaging );
The WithReference method automatically configures the connection string in your service. Consume it in your service’s Program.cs:
builder . AddKafkaProducer < string , string >( "messaging" );
// or
builder . AddKafkaConsumer < string , string >( "messaging" , consumerBuilder =>
{
consumerBuilder . Config . GroupId = "my-group" ;
});
The connection name passed to WithReference must match the name used in the component registration.
Configuration options
The following settings are available:
Producer settings
Setting Description Default DisableHealthChecksDisable automatic health check registration falseConfigProducerConfig settings null
Consumer settings
Setting Description Default DisableHealthChecksDisable automatic health check registration falseConfigConsumerConfig settings null
Health checks
The component automatically registers health checks for both producers and consumers. The health check:
Verifies connectivity to the Kafka broker
Checks cluster metadata
Reports the status in the /health endpoint
You can disable health checks by setting DisableHealthChecks to true.
Observability
Logging
The component integrates with .NET logging to provide:
Producer and consumer events
Message production and consumption details
Error information
Metrics
The following metrics are collected:
Messages produced and consumed
Produce and consume latency
Error counts
Consumer lag
Common scenarios
Custom serialization
Implement custom serializers for complex types:
public class JsonSerializer < T > : ISerializer < T >
{
public byte [] Serialize ( T data , SerializationContext context )
{
return JsonSerializer . SerializeToUtf8Bytes ( data );
}
}
public class JsonDeserializer < T > : IDeserializer < T >
{
public T Deserialize ( ReadOnlySpan < byte > data , bool isNull , SerializationContext context )
{
if ( isNull ) return default ;
return JsonSerializer . Deserialize < T >( data );
}
}
builder . AddKafkaProducer < string , Order >( "messaging" ,
producerBuilder =>
{
producerBuilder . SetValueSerializer ( new JsonSerializer < Order >());
});
Error handling
Implement error handlers:
builder . AddKafkaProducer < string , string >( "messaging" ,
producerBuilder =>
{
producerBuilder . SetErrorHandler (( producer , error ) =>
{
logger . LogError ( "Kafka error: {Reason}" , error . Reason );
});
});
Manual partition assignment
Assign specific partitions to consumers:
var consumer = serviceProvider . GetRequiredService < IConsumer < string , string >>();
consumer . Assign ( new []
{
new TopicPartition ( "orders" , new Partition ( 0 )),
new TopicPartition ( "orders" , new Partition ( 1 ))
});
Transactional producer
Ensure exactly-once semantics:
builder . AddKafkaProducer < string , string >( "messaging" ,
producerBuilder =>
{
producerBuilder . Config . TransactionalId = "my-transactional-id" ;
});
// In your code
_producer . InitTransactions ( TimeSpan . FromSeconds ( 30 ));
_producer . BeginTransaction ();
try
{
await _producer . ProduceAsync ( "orders" , message );
await _producer . ProduceAsync ( "audit" , auditMessage );
_producer . CommitTransaction ();
}
catch
{
_producer . AbortTransaction ();
throw ;
}
Best practices
Use async methods for producers
Always use ProduceAsync for better performance and backpressure handling: await _producer . ProduceAsync ( "topic" , message , cancellationToken );
Commit consumer offsets carefully
Commit offsets only after successfully processing messages: var result = _consumer . Consume ( cancellationToken );
ProcessMessage ( result . Message );
_consumer . Commit ( result ); // Commit after processing
Set appropriate timeout values
Configure timeouts based on your requirements: producerBuilder . Config . RequestTimeoutMs = 30000 ;
producerBuilder . Config . MessageTimeoutMs = 60000 ;
Handle rebalancing in consumers
Implement partition assignment handlers: consumerBuilder . SetPartitionsAssignedHandler (( consumer , partitions ) =>
{
logger . LogInformation ( "Partitions assigned: {Partitions}" ,
string . Join ( ", " , partitions ));
});
consumerBuilder . SetPartitionsRevokedHandler (( consumer , partitions ) =>
{
logger . LogInformation ( "Partitions revoked: {Partitions}" ,
string . Join ( ", " , partitions ));
});
Use batch processing for consumers
Process multiple messages in batches for better performance: var batch = new List < ConsumeResult < string , string >>();
while ( batch . Count < 100 && ! stoppingToken . IsCancellationRequested )
{
var result = _consumer . Consume ( TimeSpan . FromSeconds ( 1 ));
if ( result != null )
{
batch . Add ( result );
}
}
ProcessBatch ( batch );
_consumer . Commit ();
Regularly check consumer lag to ensure timely processing: var watermarkOffsets = _consumer . QueryWatermarkOffsets (
new TopicPartition ( "orders" , 0 ),
TimeSpan . FromSeconds ( 10 ));
var position = _consumer . Position ( new TopicPartition ( "orders" , 0 ));
var lag = watermarkOffsets . High - position ;
Additional resources
RabbitMQ Connect to RabbitMQ brokers
Azure Service Bus Work with Azure Service Bus
Redis Use Redis pub/sub messaging
Kafka Hosting Learn about the Kafka hosting integration