Overview
The Aspire.RabbitMQ.Client component registers an IConnection in your dependency injection container for connecting to RabbitMQ message brokers. It automatically enables health checks, logging, and distributed tracing.
Installation
Install the component using the .NET CLI:
dotnet add package Aspire.RabbitMQ.Client
Usage
Register the component
In your service’s Program.cs file, call the AddRabbitMQClient extension method:
builder . AddRabbitMQClient ( "messaging" );
Inject and use the connection
Retrieve the IConnection instance using dependency injection:
public class MessagePublisher
{
private readonly IConnection _connection ;
public MessagePublisher ( IConnection connection )
{
_connection = connection ;
}
public async Task PublishMessageAsync ( string queueName , string message )
{
using var channel = _connection . CreateModel ();
channel . QueueDeclare (
queue : queueName ,
durable : true ,
exclusive : false ,
autoDelete : false ,
arguments : null );
var body = Encoding . UTF8 . GetBytes ( message );
var properties = channel . CreateBasicProperties ();
properties . Persistent = true ;
channel . BasicPublish (
exchange : "" ,
routingKey : queueName ,
basicProperties : properties ,
body : body );
}
}
Consuming messages
public class MessageConsumer : BackgroundService
{
private readonly IConnection _connection ;
private readonly ILogger < MessageConsumer > _logger ;
public MessageConsumer ( IConnection connection , ILogger < MessageConsumer > logger )
{
_connection = connection ;
_logger = logger ;
}
protected override Task ExecuteAsync ( CancellationToken stoppingToken )
{
var channel = _connection . CreateModel ();
channel . QueueDeclare (
queue : "orders" ,
durable : true ,
exclusive : false ,
autoDelete : false ,
arguments : null );
var consumer = new EventingBasicConsumer ( channel );
consumer . Received += ( model , ea ) =>
{
var body = ea . Body . ToArray ();
var message = Encoding . UTF8 . GetString ( body );
_logger . LogInformation ( "Received message: {Message}" , message );
// Process the message
ProcessMessage ( message );
// Acknowledge the message
channel . BasicAck ( deliveryTag : ea . DeliveryTag , multiple : false );
};
channel . BasicConsume (
queue : "orders" ,
autoAck : false ,
consumer : consumer );
return Task . CompletedTask ;
}
private void ProcessMessage ( string message )
{
// Process the message
}
}
Configuration
The component provides multiple configuration options based on your project requirements.
Connection string
Provide a connection string in your appsettings.json:
{
"ConnectionStrings" : {
"messaging" : "amqp://guest:guest@localhost:5672"
}
}
Then register the component using the connection name:
builder . AddRabbitMQClient ( "messaging" );
For more information on connection string format, see the RabbitMQ URI specification .
Configuration providers
Configure component settings using the Aspire:RabbitMQ:Client configuration section:
{
"Aspire" : {
"RabbitMQ" : {
"Client" : {
"DisableHealthChecks" : true
}
}
}
}
Inline configuration
Configure settings directly in code using a delegate:
builder . AddRabbitMQClient ( "messaging" , settings =>
{
settings . DisableHealthChecks = true ;
});
You can also configure the ConnectionFactory:
builder . AddRabbitMQClient ( "messaging" ,
configureConnectionFactory : factory =>
{
factory . ClientProvidedName = "MyApp" ;
factory . RequestedHeartbeat = TimeSpan . FromSeconds ( 60 );
});
AppHost integration
In your AppHost project, install the RabbitMQ hosting package:
dotnet add package Aspire.Hosting.RabbitMQ
Register a RabbitMQ server in your AppHost’s Program.cs:
var messaging = builder . AddRabbitMQ ( "messaging" );
var myService = builder . AddProject < Projects . MyService >()
. WithReference ( messaging );
The WithReference method automatically configures the connection string in your service. Consume it in your service’s Program.cs:
builder . AddRabbitMQClient ( "messaging" );
The connection name passed to WithReference must match the name used in AddRabbitMQClient.
Configuration options
The following settings are available:
Setting Description Default DisableHealthChecksDisable automatic health check registration false
Health checks
The component automatically registers a health check that verifies the RabbitMQ connection. The health check:
Connects to the RabbitMQ server
Verifies the connection is open
Reports the status in the /health endpoint
You can disable health checks by setting DisableHealthChecks to true.
Observability
Logging
The component integrates with .NET logging to provide:
Connection events
Channel creation information
Error details
Tracing
Distributed tracing captures:
Message publish operations
Message consume operations
Connection activity
Traces appear in the Aspire dashboard and integrate with OpenTelemetry collectors.
Common scenarios
Work queue pattern
Distribute tasks among multiple workers:
public class TaskPublisher
{
private readonly IConnection _connection ;
public TaskPublisher ( IConnection connection )
{
_connection = connection ;
}
public void PublishTask ( string task )
{
using var channel = _connection . CreateModel ();
channel . QueueDeclare (
queue : "tasks" ,
durable : true ,
exclusive : false ,
autoDelete : false );
var body = Encoding . UTF8 . GetBytes ( task );
var properties = channel . CreateBasicProperties ();
properties . Persistent = true ;
channel . BasicPublish (
exchange : "" ,
routingKey : "tasks" ,
basicProperties : properties ,
body : body );
}
}
Publish/Subscribe pattern
Broadcast messages to multiple consumers:
public class EventPublisher
{
private readonly IConnection _connection ;
private const string ExchangeName = "events" ;
public EventPublisher ( IConnection connection )
{
_connection = connection ;
}
public void PublishEvent ( string eventType , string message )
{
using var channel = _connection . CreateModel ();
channel . ExchangeDeclare (
exchange : ExchangeName ,
type : ExchangeType . Fanout ,
durable : true );
var body = Encoding . UTF8 . GetBytes ( message );
channel . BasicPublish (
exchange : ExchangeName ,
routingKey : "" ,
basicProperties : null ,
body : body );
}
}
public class EventSubscriber : BackgroundService
{
private readonly IConnection _connection ;
public EventSubscriber ( IConnection connection )
{
_connection = connection ;
}
protected override Task ExecuteAsync ( CancellationToken stoppingToken )
{
var channel = _connection . CreateModel ();
channel . ExchangeDeclare (
exchange : "events" ,
type : ExchangeType . Fanout ,
durable : true );
var queueName = channel . QueueDeclare (). QueueName ;
channel . QueueBind ( queue : queueName , exchange : "events" , routingKey : "" );
var consumer = new EventingBasicConsumer ( channel );
consumer . Received += ( model , ea ) =>
{
var body = ea . Body . ToArray ();
var message = Encoding . UTF8 . GetString ( body );
// Handle event
};
channel . BasicConsume ( queue : queueName , autoAck : true , consumer : consumer );
return Task . CompletedTask ;
}
}
Routing pattern
Route messages based on criteria:
public class LogPublisher
{
private readonly IConnection _connection ;
private const string ExchangeName = "logs" ;
public LogPublisher ( IConnection connection )
{
_connection = connection ;
}
public void PublishLog ( string severity , string message )
{
using var channel = _connection . CreateModel ();
channel . ExchangeDeclare (
exchange : ExchangeName ,
type : ExchangeType . Direct ,
durable : true );
var body = Encoding . UTF8 . GetBytes ( message );
channel . BasicPublish (
exchange : ExchangeName ,
routingKey : severity ,
basicProperties : null ,
body : body );
}
}
RPC pattern
Implement request/reply:
public class RpcClient
{
private readonly IConnection _connection ;
private readonly IModel _channel ;
private readonly string _replyQueueName ;
private readonly ConcurrentDictionary < string , TaskCompletionSource < string >> _callbackMapper = new ();
public RpcClient ( IConnection connection )
{
_connection = connection ;
_channel = _connection . CreateModel ();
_replyQueueName = _channel . QueueDeclare (). QueueName ;
var consumer = new EventingBasicConsumer ( _channel );
consumer . Received += ( model , ea ) =>
{
if ( _callbackMapper . TryRemove ( ea . BasicProperties . CorrelationId , out var tcs ))
{
var body = ea . Body . ToArray ();
var response = Encoding . UTF8 . GetString ( body );
tcs . TrySetResult ( response );
}
};
_channel . BasicConsume (
consumer : consumer ,
queue : _replyQueueName ,
autoAck : true );
}
public Task < string > CallAsync ( string message , CancellationToken cancellationToken = default )
{
var props = _channel . CreateBasicProperties ();
var correlationId = Guid . NewGuid (). ToString ();
props . CorrelationId = correlationId ;
props . ReplyTo = _replyQueueName ;
var messageBytes = Encoding . UTF8 . GetBytes ( message );
var tcs = new TaskCompletionSource < string >();
_callbackMapper . TryAdd ( correlationId , tcs );
_channel . BasicPublish (
exchange : "" ,
routingKey : "rpc_queue" ,
basicProperties : props ,
body : messageBytes );
cancellationToken . Register (() => _callbackMapper . TryRemove ( correlationId , out _ ));
return tcs . Task ;
}
}
Best practices
Create channels per thread or operation. Don’t share channels across threads as they are not thread-safe. However, do reuse the IConnection instance as it is thread-safe and manages connection pooling.
Implement message acknowledgment
Always use manual acknowledgment in production to prevent message loss: channel . BasicConsume (
queue : queueName ,
autoAck : false , // Manual acknowledgment
consumer : consumer );
// After processing
channel . BasicAck ( deliveryTag : ea . DeliveryTag , multiple : false );
Use durable queues and persistent messages
Ensure messages survive broker restarts: channel . QueueDeclare (
queue : queueName ,
durable : true , // Queue survives restarts
exclusive : false ,
autoDelete : false );
var properties = channel . CreateBasicProperties ();
properties . Persistent = true ; // Message survives restarts
Implement prefetch for consumers
Control how many messages a consumer processes at once: channel . BasicQos (
prefetchSize : 0 ,
prefetchCount : 1 , // Process one message at a time
global : false );
Handle connection failures
Implement reconnection logic and error handling: _connection . ConnectionShutdown += ( sender , args ) =>
{
_logger . LogWarning ( "RabbitMQ connection shut down" );
// Implement reconnection logic
};
Use dead letter exchanges
Handle failed messages gracefully: var arguments = new Dictionary < string , object >
{
{ "x-dead-letter-exchange" , "dlx" },
{ "x-dead-letter-routing-key" , "failed" }
};
channel . QueueDeclare (
queue : queueName ,
durable : true ,
exclusive : false ,
autoDelete : false ,
arguments : arguments );
Additional resources
Apache Kafka Connect to Apache Kafka brokers
Azure Service Bus Work with Azure Service Bus
Redis Use Redis pub/sub messaging
RabbitMQ Hosting Learn about the RabbitMQ hosting integration