Skip to main content

Overview

The Aspire.Confluent.Kafka component registers IProducer<TKey, TValue> and IConsumer<TKey, TValue> instances in your dependency injection container for producing and consuming messages with Apache Kafka. It automatically enables health checks, logging, and metrics.

Installation

Install the component using the .NET CLI:
dotnet add package Aspire.Confluent.Kafka

Usage

Register a producer

In your service’s Program.cs file, call the AddKafkaProducer extension method with the key and value types:
builder.AddKafkaProducer<string, string>("messaging");

Inject and use the producer

Retrieve the IProducer<TKey, TValue> instance using dependency injection:
public class OrderService : BackgroundService
{
    private readonly IProducer<string, string> _producer;
    private readonly ILogger<OrderService> _logger;

    public OrderService(
        IProducer<string, string> producer,
        ILogger<OrderService> logger)
    {
        _producer = producer;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5));
        long orderNumber = 0;
        
        while (await timer.WaitForNextTickAsync(stoppingToken))
        {
            var message = new Message<string, string>
            {
                Key = Guid.NewGuid().ToString(),
                Value = $"Order #{orderNumber++}"
            };

            try
            {
                var result = await _producer.ProduceAsync("orders", message, stoppingToken);
                _logger.LogInformation(
                    "Produced message to {Topic} partition {Partition} at offset {Offset}",
                    result.Topic, result.Partition, result.Offset);
            }
            catch (ProduceException<string, string> ex)
            {
                _logger.LogError(ex, "Failed to produce message");
            }
        }
    }
}

Register a consumer

Register a consumer with key and value types:
builder.AddKafkaConsumer<string, string>("messaging", consumerBuilder =>
{
    consumerBuilder.Config.GroupId = "order-processing-group";
    consumerBuilder.Config.AutoOffsetReset = AutoOffsetReset.Earliest;
});

Consume messages

public class OrderProcessor : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly ILogger<OrderProcessor> _logger;

    public OrderProcessor(
        IConsumer<string, string> consumer,
        ILogger<OrderProcessor> logger)
    {
        _consumer = consumer;
        _logger = logger;
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _consumer.Subscribe("orders");

        try
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var consumeResult = _consumer.Consume(stoppingToken);
                
                _logger.LogInformation(
                    "Consumed message '{Value}' from topic {Topic} partition {Partition} at offset {Offset}",
                    consumeResult.Message.Value,
                    consumeResult.Topic,
                    consumeResult.Partition,
                    consumeResult.Offset);

                // Process the message
                ProcessOrder(consumeResult.Message.Value);

                // Commit the offset
                _consumer.Commit(consumeResult);
            }
        }
        catch (OperationCanceledException)
        {
            _consumer.Close();
        }

        return Task.CompletedTask;
    }

    private void ProcessOrder(string order)
    {
        // Process the order
    }
}

Configuration

The component provides multiple configuration options based on your project requirements.

Connection string

Provide a connection string (bootstrap servers) in your appsettings.json:
{
  "ConnectionStrings": {
    "messaging": "localhost:9092"
  }
}
Then register the component using the connection name:
builder.AddKafkaProducer<string, string>("messaging");
The connection string sets the BootstrapServers property. For more information, see the BootstrapServers documentation.

Configuration providers

Configure producer settings using the Aspire:Confluent:Kafka:Producer configuration section:
{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}
For consumers, use the Aspire:Confluent:Kafka:Consumer section:
{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Consumer": {
          "DisableHealthChecks": false,
          "Config": {
            "GroupId": "my-consumer-group",
            "AutoOffsetReset": "Earliest"
          }
        }
      }
    }
  }
}
Consumers require the GroupId property to be set to track consumed message offsets.

Inline configuration

Configure settings directly in code using a delegate:
builder.AddKafkaProducer<string, string>("messaging", settings => 
{
    settings.DisableHealthChecks = true;
});
You can also configure the producer or consumer builder:
builder.AddKafkaProducer<string, MyMessage>("messaging",
    producerBuilder => 
    {
        producerBuilder.SetValueSerializer(new MyMessageSerializer());
    });

builder.AddKafkaConsumer<string, MyMessage>("messaging",
    consumerBuilder => 
    {
        consumerBuilder.SetValueDeserializer(new MyMessageDeserializer());
        consumerBuilder.Config.GroupId = "my-group";
    });

AppHost integration

In your AppHost project, install the Kafka hosting package:
dotnet add package Aspire.Hosting.Kafka
Register a Kafka broker in your AppHost’s Program.cs:
var messaging = builder.AddKafka("messaging");

var myService = builder.AddProject<Projects.MyService>()
                       .WithReference(messaging);
The WithReference method automatically configures the connection string in your service. Consume it in your service’s Program.cs:
builder.AddKafkaProducer<string, string>("messaging");
// or
builder.AddKafkaConsumer<string, string>("messaging", consumerBuilder =>
{
    consumerBuilder.Config.GroupId = "my-group";
});
The connection name passed to WithReference must match the name used in the component registration.

Configuration options

The following settings are available:

Producer settings

SettingDescriptionDefault
DisableHealthChecksDisable automatic health check registrationfalse
ConfigProducerConfig settingsnull

Consumer settings

SettingDescriptionDefault
DisableHealthChecksDisable automatic health check registrationfalse
ConfigConsumerConfig settingsnull

Health checks

The component automatically registers health checks for both producers and consumers. The health check:
  • Verifies connectivity to the Kafka broker
  • Checks cluster metadata
  • Reports the status in the /health endpoint
You can disable health checks by setting DisableHealthChecks to true.

Observability

Logging

The component integrates with .NET logging to provide:
  • Producer and consumer events
  • Message production and consumption details
  • Error information

Metrics

The following metrics are collected:
  • Messages produced and consumed
  • Produce and consume latency
  • Error counts
  • Consumer lag

Common scenarios

Custom serialization

Implement custom serializers for complex types:
public class JsonSerializer<T> : ISerializer<T>
{
    public byte[] Serialize(T data, SerializationContext context)
    {
        return JsonSerializer.SerializeToUtf8Bytes(data);
    }
}

public class JsonDeserializer<T> : IDeserializer<T>
{
    public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        if (isNull) return default;
        return JsonSerializer.Deserialize<T>(data);
    }
}

builder.AddKafkaProducer<string, Order>("messaging",
    producerBuilder => 
    {
        producerBuilder.SetValueSerializer(new JsonSerializer<Order>());
    });

Error handling

Implement error handlers:
builder.AddKafkaProducer<string, string>("messaging",
    producerBuilder => 
    {
        producerBuilder.SetErrorHandler((producer, error) =>
        {
            logger.LogError("Kafka error: {Reason}", error.Reason);
        });
    });

Manual partition assignment

Assign specific partitions to consumers:
var consumer = serviceProvider.GetRequiredService<IConsumer<string, string>>();
consumer.Assign(new[] 
{ 
    new TopicPartition("orders", new Partition(0)),
    new TopicPartition("orders", new Partition(1))
});

Transactional producer

Ensure exactly-once semantics:
builder.AddKafkaProducer<string, string>("messaging",
    producerBuilder => 
    {
        producerBuilder.Config.TransactionalId = "my-transactional-id";
    });

// In your code
_producer.InitTransactions(TimeSpan.FromSeconds(30));
_producer.BeginTransaction();

try
{
    await _producer.ProduceAsync("orders", message);
    await _producer.ProduceAsync("audit", auditMessage);
    _producer.CommitTransaction();
}
catch
{
    _producer.AbortTransaction();
    throw;
}

Best practices

Always use ProduceAsync for better performance and backpressure handling:
await _producer.ProduceAsync("topic", message, cancellationToken);
Commit offsets only after successfully processing messages:
var result = _consumer.Consume(cancellationToken);
ProcessMessage(result.Message);
_consumer.Commit(result);  // Commit after processing
Configure timeouts based on your requirements:
producerBuilder.Config.RequestTimeoutMs = 30000;
producerBuilder.Config.MessageTimeoutMs = 60000;
Implement partition assignment handlers:
consumerBuilder.SetPartitionsAssignedHandler((consumer, partitions) =>
{
    logger.LogInformation("Partitions assigned: {Partitions}", 
        string.Join(", ", partitions));
});

consumerBuilder.SetPartitionsRevokedHandler((consumer, partitions) =>
{
    logger.LogInformation("Partitions revoked: {Partitions}", 
        string.Join(", ", partitions));
});
Process multiple messages in batches for better performance:
var batch = new List<ConsumeResult<string, string>>();

while (batch.Count < 100 && !stoppingToken.IsCancellationRequested)
{
    var result = _consumer.Consume(TimeSpan.FromSeconds(1));
    if (result != null)
    {
        batch.Add(result);
    }
}

ProcessBatch(batch);
_consumer.Commit();
Regularly check consumer lag to ensure timely processing:
var watermarkOffsets = _consumer.QueryWatermarkOffsets(
    new TopicPartition("orders", 0), 
    TimeSpan.FromSeconds(10));

var position = _consumer.Position(new TopicPartition("orders", 0));
var lag = watermarkOffsets.High - position;

Additional resources

RabbitMQ

Connect to RabbitMQ brokers

Azure Service Bus

Work with Azure Service Bus

Redis

Use Redis pub/sub messaging

Kafka Hosting

Learn about the Kafka hosting integration

Build docs developers (and LLMs) love