Skip to main content
The RabbitMqEventBus provides a robust implementation of the event bus pattern using RabbitMQ for asynchronous message publishing and subscription.

RabbitMqEventBus Implementation

Class Definition

Core.Infraestructure.EventBus.RabbitMq/Buses/RabbitMqEventBus.cs
namespace Core.Infraestructure
{
    public class RabbitMqEventBus : IEventBus, IDisposable
    {
        private readonly IConnectionManager _connectionManager;
        private readonly ILogger<RabbitMqEventBus> _logger;
        private readonly IEventBusSubscriptionManager _eventBusSubscriptionManager;
        private readonly IServiceScopeFactory _serviceScopeFactory;
        private readonly int _retryCount;
        private readonly string _exchange;
        private readonly string _queueName;
        private IModel _consumersChannel;
    }
}

Constructor

public RabbitMqEventBus(
    IConnectionManager connectionManager,
    ILogger<RabbitMqEventBus> logger,
    IEventBusSubscriptionManager eventBusSubscriptionManager,
    IServiceScopeFactory serviceScopeFactory,
    string queueName = "CommonEventBus",
    int retryCont = 3)
Parameters:
  • connectionManager: Manages RabbitMQ connections
  • logger: Logger instance for diagnostics
  • eventBusSubscriptionManager: Manages event subscriptions
  • serviceScopeFactory: Creates service scopes for event handlers
  • queueName: Name of the RabbitMQ queue (default: “CommonEventBus”)
  • retryCont: Number of retry attempts (default: 3)

Publishing Events

Publish Method

public void Publish(IntegrationEvent @event)
Publishes an event to the event bus with automatic retry logic. Example:
public class OrderCreatedEvent : IntegrationEvent
{
    public int OrderId { get; set; }
    public decimal Amount { get; set; }
}

// Publishing the event
_eventBus.Publish(new OrderCreatedEvent 
{ 
    OrderId = 123, 
    Amount = 99.99m 
});

Retry Policy

The publish operation uses Polly for exponential backoff retry:
RetryPolicy retryPolicy = RetryPolicy
    .Handle<BrokerUnreachableException>()
    .Or<SocketException>()
    .WaitAndRetry(_retryCount, 
        retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
        (ex, time) => {
            _logger.LogWarning(ex, 
                "Could not publish event: {EventId} after {Timeout}s", 
                @event.Id, $"{time.TotalSeconds:n1}");
        });

Subscribing to Events

Subscribe Method

public void Subscribe<TEvent, THandler>()
    where TEvent : IntegrationEvent
    where THandler : IIntegrationEventHandler<TEvent>
Example:
public class OrderCreatedEventHandler : IIntegrationEventHandler<OrderCreatedEvent>
{
    public async Task Handle(OrderCreatedEvent @event)
    {
        // Handle the event
        Console.WriteLine($"Order {@event.OrderId} created with amount {@event.Amount}");
    }
}

// Subscribing to the event
_eventBus.Subscribe<OrderCreatedEvent, OrderCreatedEventHandler>();

Dynamic Subscription

public void SubscribeDynamic<THandler>(string eventName) 
    where THandler : IDynamicIntegrationEventHandler
Allows subscribing to events without strong typing.

Unsubscribe Methods

public void Unsubscribe<TEvent, THandler>()
    where TEvent : IntegrationEvent
    where THandler : IIntegrationEventHandler<TEvent>

public void UnsubscribeDynamic<THandler>(string eventName) 
    where THandler : IDynamicIntegrationEventHandler

Connection Management

The ConnectionManager handles RabbitMQ connection lifecycle with automatic reconnection.

ConnectionManager

Core.Infraestructure.EventBus.RabbitMq/Managers/ConnectionManager.cs
public class ConnectionManager : IConnectionManager, IDisposable
{
    private readonly IConnectionFactory _connectionFactory;
    private readonly ILogger<ConnectionManager> _logger;
    private IConnection _connection;
    private readonly int _retryCount;

    public bool IsConnected { get; }
    
    public IModel CreateModel()
    public bool TryConnect()
}

Automatic Reconnection

The connection manager automatically handles:
  • Connection shutdown events
  • Callback exceptions
  • Connection blocked events
public bool TryConnect()
{
    var policy = Policy.Handle<SocketException>()
        .Or<BrokerUnreachableException>()
        .WaitAndRetry(_retryCount, 
            retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
            (ex, time) => {
                _logger.LogWarning(ex, 
                    "RabbitMQ Client could not connect after {TimeOut}s", 
                    $"{time.TotalSeconds:n1}");
            });

    policy.Execute(() => {
        _connection = _connectionFactory.CreateConnection();
    });

    return IsConnected;
}

Configuration

RabbitMqOptions

Core.Infraestructure.EventBus.RabbitMq/RabbitMqOptions.cs
public class RabbitMqOptions
{
    public string ConnectionString { get; set; }
    public bool? DispatchConsumersAsync { get; set; }
    public string Host { get; set; }
    public string Password { get; set; }
    public int Port { get; set; }
    public string QueueName { get; set; }
    public string UserName { get; set; }
    public string VirtualHost { get; set; }
}

appsettings.json Configuration

Option 1: Using Connection String
{
  "RabbitMqEventBus": {
    "ConnectionString": "amqp://user:password@localhost:5672/vhost",
    "QueueName": "MyApplicationQueue"
  }
}
Option 2: Individual Properties
{
  "RabbitMqEventBus": {
    "Host": "localhost",
    "Port": 5672,
    "UserName": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "QueueName": "MyApplicationQueue"
  }
}

Service Registration

Register the event bus in your DI container:
Core.Infraestructure.EventBus.RabbitMq/Extensions/EventBusExtensions.cs
public static IServiceCollection AddEventBus(
    this IServiceCollection services, 
    IConfiguration configuration)
{
    services.Configure<RabbitMqOptions>(configuration.GetSection("RabbitMqEventBus"));
    
    string queueName = services.ConfigureConnectionFactory(configuration);
    
    services.AddSingleton<IConnectionManager, ConnectionManager>();
    services.AddSingleton<IEventBusSubscriptionManager, EventBusSubscriptionManager>();
    services.AddSingleton<IEventBus, RabbitMqEventBus>();
    
    return services;
}

Usage in Startup/Program.cs

services.AddEventBus(configuration);

Event Serialization

Events are serialized using Newtonsoft.Json:
// Publishing
var message = JsonConvert.SerializeObject(@event);
byte[] body = Encoding.UTF8.GetBytes(message);

// Consuming
string message = Encoding.UTF8.GetString(args.Body.ToArray());
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);

Complete Usage Example

1. Define Event

public class ProductPriceChangedEvent : IntegrationEvent
{
    public int ProductId { get; set; }
    public decimal OldPrice { get; set; }
    public decimal NewPrice { get; set; }
}

2. Create Handler

public class ProductPriceChangedEventHandler 
    : IIntegrationEventHandler<ProductPriceChangedEvent>
{
    private readonly ILogger<ProductPriceChangedEventHandler> _logger;

    public ProductPriceChangedEventHandler(ILogger<ProductPriceChangedEventHandler> logger)
    {
        _logger = logger;
    }

    public async Task Handle(ProductPriceChangedEvent @event)
    {
        _logger.LogInformation(
            "Product {ProductId} price changed from {OldPrice} to {NewPrice}",
            @event.ProductId, @event.OldPrice, @event.NewPrice);
        
        // Update related data, send notifications, etc.
        await Task.CompletedTask;
    }
}

3. Register Handler

services.AddTransient<ProductPriceChangedEventHandler>();

4. Subscribe to Event

public void Configure(IApplicationBuilder app, IEventBus eventBus)
{
    eventBus.Subscribe<ProductPriceChangedEvent, ProductPriceChangedEventHandler>();
}

5. Publish Event

public class ProductService
{
    private readonly IEventBus _eventBus;

    public ProductService(IEventBus eventBus)
    {
        _eventBus = eventBus;
    }

    public void UpdatePrice(int productId, decimal newPrice, decimal oldPrice)
    {
        // Update price in database
        // ...

        // Publish event
        _eventBus.Publish(new ProductPriceChangedEvent
        {
            ProductId = productId,
            OldPrice = oldPrice,
            NewPrice = newPrice
        });
    }
}

Key Features

  • Reliable Messaging: Persistent messages with delivery mode 2
  • Automatic Retry: Exponential backoff for connection and publish failures
  • Reconnection: Automatic reconnection on connection loss
  • Direct Exchange: Uses RabbitMQ direct exchange for routing
  • Async Consumers: Supports asynchronous message processing
  • Scoped Services: Event handlers run in isolated service scopes
  • Type Safety: Strongly-typed event handlers

Build docs developers (and LLMs) love