Skip to main content

Overview

The Aspire.RabbitMQ.Client component registers an IConnection in your dependency injection container for connecting to RabbitMQ message brokers. It automatically enables health checks, logging, and distributed tracing.

Installation

Install the component using the .NET CLI:
dotnet add package Aspire.RabbitMQ.Client

Usage

Register the component

In your service’s Program.cs file, call the AddRabbitMQClient extension method:
builder.AddRabbitMQClient("messaging");

Inject and use the connection

Retrieve the IConnection instance using dependency injection:
public class MessagePublisher
{
    private readonly IConnection _connection;

    public MessagePublisher(IConnection connection)
    {
        _connection = connection;
    }

    public async Task PublishMessageAsync(string queueName, string message)
    {
        using var channel = _connection.CreateModel();
        
        channel.QueueDeclare(
            queue: queueName,
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: null);

        var body = Encoding.UTF8.GetBytes(message);
        
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        channel.BasicPublish(
            exchange: "",
            routingKey: queueName,
            basicProperties: properties,
            body: body);
    }
}

Consuming messages

public class MessageConsumer : BackgroundService
{
    private readonly IConnection _connection;
    private readonly ILogger<MessageConsumer> _logger;

    public MessageConsumer(IConnection connection, ILogger<MessageConsumer> logger)
    {
        _connection = connection;
        _logger = logger;
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var channel = _connection.CreateModel();
        
        channel.QueueDeclare(
            queue: "orders",
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            
            _logger.LogInformation("Received message: {Message}", message);
            
            // Process the message
            ProcessMessage(message);
            
            // Acknowledge the message
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };

        channel.BasicConsume(
            queue: "orders",
            autoAck: false,
            consumer: consumer);

        return Task.CompletedTask;
    }

    private void ProcessMessage(string message)
    {
        // Process the message
    }
}

Configuration

The component provides multiple configuration options based on your project requirements.

Connection string

Provide a connection string in your appsettings.json:
{
  "ConnectionStrings": {
    "messaging": "amqp://guest:guest@localhost:5672"
  }
}
Then register the component using the connection name:
builder.AddRabbitMQClient("messaging");
For more information on connection string format, see the RabbitMQ URI specification.

Configuration providers

Configure component settings using the Aspire:RabbitMQ:Client configuration section:
{
  "Aspire": {
    "RabbitMQ": {
      "Client": {
        "DisableHealthChecks": true
      }
    }
  }
}

Inline configuration

Configure settings directly in code using a delegate:
builder.AddRabbitMQClient("messaging", settings => 
{
    settings.DisableHealthChecks = true;
});
You can also configure the ConnectionFactory:
builder.AddRabbitMQClient("messaging",
    configureConnectionFactory: factory => 
    {
        factory.ClientProvidedName = "MyApp";
        factory.RequestedHeartbeat = TimeSpan.FromSeconds(60);
    });

AppHost integration

In your AppHost project, install the RabbitMQ hosting package:
dotnet add package Aspire.Hosting.RabbitMQ
Register a RabbitMQ server in your AppHost’s Program.cs:
var messaging = builder.AddRabbitMQ("messaging");

var myService = builder.AddProject<Projects.MyService>()
                       .WithReference(messaging);
The WithReference method automatically configures the connection string in your service. Consume it in your service’s Program.cs:
builder.AddRabbitMQClient("messaging");
The connection name passed to WithReference must match the name used in AddRabbitMQClient.

Configuration options

The following settings are available:
SettingDescriptionDefault
DisableHealthChecksDisable automatic health check registrationfalse

Health checks

The component automatically registers a health check that verifies the RabbitMQ connection. The health check:
  • Connects to the RabbitMQ server
  • Verifies the connection is open
  • Reports the status in the /health endpoint
You can disable health checks by setting DisableHealthChecks to true.

Observability

Logging

The component integrates with .NET logging to provide:
  • Connection events
  • Channel creation information
  • Error details

Tracing

Distributed tracing captures:
  • Message publish operations
  • Message consume operations
  • Connection activity
Traces appear in the Aspire dashboard and integrate with OpenTelemetry collectors.

Common scenarios

Work queue pattern

Distribute tasks among multiple workers:
public class TaskPublisher
{
    private readonly IConnection _connection;

    public TaskPublisher(IConnection connection)
    {
        _connection = connection;
    }

    public void PublishTask(string task)
    {
        using var channel = _connection.CreateModel();
        
        channel.QueueDeclare(
            queue: "tasks",
            durable: true,
            exclusive: false,
            autoDelete: false);

        var body = Encoding.UTF8.GetBytes(task);
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        channel.BasicPublish(
            exchange: "",
            routingKey: "tasks",
            basicProperties: properties,
            body: body);
    }
}

Publish/Subscribe pattern

Broadcast messages to multiple consumers:
public class EventPublisher
{
    private readonly IConnection _connection;
    private const string ExchangeName = "events";

    public EventPublisher(IConnection connection)
    {
        _connection = connection;
    }

    public void PublishEvent(string eventType, string message)
    {
        using var channel = _connection.CreateModel();
        
        channel.ExchangeDeclare(
            exchange: ExchangeName,
            type: ExchangeType.Fanout,
            durable: true);

        var body = Encoding.UTF8.GetBytes(message);
        
        channel.BasicPublish(
            exchange: ExchangeName,
            routingKey: "",
            basicProperties: null,
            body: body);
    }
}

public class EventSubscriber : BackgroundService
{
    private readonly IConnection _connection;

    public EventSubscriber(IConnection connection)
    {
        _connection = connection;
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var channel = _connection.CreateModel();
        
        channel.ExchangeDeclare(
            exchange: "events",
            type: ExchangeType.Fanout,
            durable: true);

        var queueName = channel.QueueDeclare().QueueName;
        channel.QueueBind(queue: queueName, exchange: "events", routingKey: "");

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            // Handle event
        };

        channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

        return Task.CompletedTask;
    }
}

Routing pattern

Route messages based on criteria:
public class LogPublisher
{
    private readonly IConnection _connection;
    private const string ExchangeName = "logs";

    public LogPublisher(IConnection connection)
    {
        _connection = connection;
    }

    public void PublishLog(string severity, string message)
    {
        using var channel = _connection.CreateModel();
        
        channel.ExchangeDeclare(
            exchange: ExchangeName,
            type: ExchangeType.Direct,
            durable: true);

        var body = Encoding.UTF8.GetBytes(message);
        
        channel.BasicPublish(
            exchange: ExchangeName,
            routingKey: severity,
            basicProperties: null,
            body: body);
    }
}

RPC pattern

Implement request/reply:
public class RpcClient
{
    private readonly IConnection _connection;
    private readonly IModel _channel;
    private readonly string _replyQueueName;
    private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> _callbackMapper = new();

    public RpcClient(IConnection connection)
    {
        _connection = connection;
        _channel = _connection.CreateModel();
        _replyQueueName = _channel.QueueDeclare().QueueName;
        
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            if (_callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs))
            {
                var body = ea.Body.ToArray();
                var response = Encoding.UTF8.GetString(body);
                tcs.TrySetResult(response);
            }
        };
        
        _channel.BasicConsume(
            consumer: consumer,
            queue: _replyQueueName,
            autoAck: true);
    }

    public Task<string> CallAsync(string message, CancellationToken cancellationToken = default)
    {
        var props = _channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = _replyQueueName;
        
        var messageBytes = Encoding.UTF8.GetBytes(message);
        var tcs = new TaskCompletionSource<string>();
        _callbackMapper.TryAdd(correlationId, tcs);
        
        _channel.BasicPublish(
            exchange: "",
            routingKey: "rpc_queue",
            basicProperties: props,
            body: messageBytes);
        
        cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out _));
        return tcs.Task;
    }
}

Best practices

Create channels per thread or operation. Don’t share channels across threads as they are not thread-safe. However, do reuse the IConnection instance as it is thread-safe and manages connection pooling.
Always use manual acknowledgment in production to prevent message loss:
channel.BasicConsume(
    queue: queueName,
    autoAck: false,  // Manual acknowledgment
    consumer: consumer);

// After processing
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Ensure messages survive broker restarts:
channel.QueueDeclare(
    queue: queueName,
    durable: true,  // Queue survives restarts
    exclusive: false,
    autoDelete: false);

var properties = channel.CreateBasicProperties();
properties.Persistent = true;  // Message survives restarts
Control how many messages a consumer processes at once:
channel.BasicQos(
    prefetchSize: 0,
    prefetchCount: 1,  // Process one message at a time
    global: false);
Implement reconnection logic and error handling:
_connection.ConnectionShutdown += (sender, args) =>
{
    _logger.LogWarning("RabbitMQ connection shut down");
    // Implement reconnection logic
};
Handle failed messages gracefully:
var arguments = new Dictionary<string, object>
{
    { "x-dead-letter-exchange", "dlx" },
    { "x-dead-letter-routing-key", "failed" }
};

channel.QueueDeclare(
    queue: queueName,
    durable: true,
    exclusive: false,
    autoDelete: false,
    arguments: arguments);

Additional resources

Apache Kafka

Connect to Apache Kafka brokers

Azure Service Bus

Work with Azure Service Bus

Redis

Use Redis pub/sub messaging

RabbitMQ Hosting

Learn about the RabbitMQ hosting integration

Build docs developers (and LLMs) love