The Event Bus enables asynchronous, decoupled communication between microservices through integration events. The default implementation uses RabbitMQ with built-in resilience patterns.
Core Interfaces
IEventBus
The main interface for publishing and subscribing to integration events.
Publishes an integration event to the message broker. Parameters:
@event: The integration event to publish
Subscribe<TEvent, THandler>
Subscribes a handler to a specific integration event type. Type Parameters:
TEvent: The integration event type
THandler: The handler that processes the event
Unsubscribe<TEvent, THandler>
Removes a subscription for a specific event and handler. Type Parameters:
TEvent: The integration event type
THandler: The handler to unsubscribe
SubscribeDynamic<THandler>
Subscribes to an event using dynamic typing (by event name). Parameters:
eventName: String name of the event to subscribe to
Core.Application.EventBus/Buses/IEventBus.cs
namespace Core . Application
{
public interface IEventBus
{
void Publish ( IntegrationEvent @event );
void Subscribe < TEvent , THandler >()
where TEvent : IntegrationEvent
where THandler : IIntegrationEventHandler < TEvent >;
void SubscribeDynamic < THandler >( string eventName )
where THandler : IDynamicIntegrationEventHandler ;
void Unsubscribe < TEvent , THandler >()
where TEvent : IntegrationEvent
where THandler : IIntegrationEventHandler < TEvent >;
void UnsubscribeDynamic < THandler >( string eventName )
where THandler : IDynamicIntegrationEventHandler ;
}
}
Integration Events
IntegrationEvent
Base class for all integration events exchanged between services.
Unique identifier for the event instance.
Timestamp when the event was created (in UTC).
The type/name of the event for routing and identification.
The subject or topic of the event.
The payload containing event-specific data.
Core.Application.EventBus/Events/IntegrationEvent.cs
using Newtonsoft . Json ;
namespace Core . Application
{
public abstract class IntegrationEvent
{
[ JsonProperty ]
public Guid Id { get ; private set ; }
[ JsonProperty ]
public DateTime CreateDateUtc { get ; private set ; }
[ JsonProperty ]
public string EventType { get ; private set ; }
[ JsonProperty ]
public string Subject { get ; private set ; }
[ JsonProperty ]
public object Data { get ; private set ; }
[ JsonConstructor ]
protected IntegrationEvent ()
{
}
protected IntegrationEvent ( string eventType , string subject , object data )
{
Id = Guid . NewGuid ();
CreateDateUtc = DateTime . UtcNow . ToLocalTime ();
EventType = eventType ;
Subject = subject ;
Data = data ;
}
}
}
Event Handlers
IIntegrationEventHandler<TEvent>
Interface for typed event handlers.
Core.Application.EventBus/Handlers/IIntegrationEventHandler.cs
public interface IIntegrationEventHandler < in TEvent > : IIntegrationEventHandler
where TEvent : IntegrationEvent
{
Task Handle ( TEvent @event );
}
IDynamicIntegrationEventHandler
Interface for dynamic event handlers that work with event data as objects.
public interface IDynamicIntegrationEventHandler
{
Task Handle ( object eventData );
}
RabbitMQ Implementation
The framework includes a RabbitMQ-based implementation with resilience patterns:
Configuration
Configure RabbitMQ connection in appsettings.json:
{
"RabbitMqEventBus" : {
"Host" : "localhost" ,
"Port" : 5672 ,
"VirtualHost" : "/" ,
"UserName" : "guest" ,
"Password" : "guest" ,
"QueueName" : "CommonEventBus" ,
"ConnectionString" : null
}
}
If ConnectionString is provided, it takes precedence over individual host settings.
Registration
Register the Event Bus in your service collection:
Infrastructure/Registrations/InfraestructureServicesRegistration.cs
public static IServiceCollection AddInfrastructureServices (
this IServiceCollection services ,
IConfiguration configuration )
{
// Register Event Bus with RabbitMQ implementation
services . AddEventBus ( configuration );
return services ;
}
The AddEventBus extension method automatically configures:
RabbitMQ connection factory
Connection manager
Subscription manager
Event bus implementation
Resilience Features
The RabbitMQ implementation includes:
Retry Policy Automatic retry with exponential backoff (3 attempts by default).
Circuit Breaker Prevents cascading failures by breaking after 5 consecutive failures.
Persistent Messages Messages are marked as durable for persistence across restarts.
Connection Recovery Automatic reconnection when broker connection is lost.
Creating an Integration Event
Define your integration event by inheriting from IntegrationEvent:
Application/Integrations/Events/DummyEntityCreatedIntegrationEvent.cs
using Core . Application ;
namespace Application
{
public class DummyEntityCreatedIntegrationEvent : IntegrationEvent
{
public DummyEntityCreatedIntegrationEvent ()
{
}
public DummyEntityCreatedIntegrationEvent ( string eventType , string subject , object data )
: base ( eventType , subject , data )
{
}
public DummyEntityCreatedIntegrationEvent (
Guid id ,
DateTime date ,
string eventType ,
string subject ,
object data )
: base ( id , date , eventType , subject , data )
{
}
}
}
Publishing Events
Create a publisher handler that publishes events to the bus:
Application/Integrations/Handlers/Publishers/DummyEntityCreatedIntegrationEventHandlerPub.cs
using Core . Application ;
public class DummyEntityCreatedIntegrationEventHandlerPub
: IIntegrationEventHandler < DummyEntityCreatedIntegrationEvent >
{
private readonly IEventBus _eventBus ;
public DummyEntityCreatedIntegrationEventHandlerPub ( IEventBus eventBus )
{
_eventBus = eventBus ?? throw new ArgumentNullException ( nameof ( eventBus ));
}
public Task Handle ( DummyEntityCreatedIntegrationEvent @event )
{
// Publish to RabbitMQ
_eventBus . Publish ( @event );
return Task . CompletedTask ;
}
}
Register Publishers
Application/Registrations/ApplicationServicesRegistration.cs
private static IServiceCollection AddPublishers ( this IServiceCollection services )
{
services . AddTransient < IIntegrationEventHandler < DummyEntityCreatedIntegrationEvent > ,
DummyEntityCreatedIntegrationEventHandlerPub > ();
return services ;
}
Subscribing to Events
Create a subscriber handler that processes incoming events:
Application/Integrations/Handlers/Subscribers/DummyEntityCreatedIntegrationEventHandlerSub.cs
using Core . Application ;
using Microsoft . Extensions . Logging ;
public class DummyEntityCreatedIntegrationEventHandlerSub
: IIntegrationEventHandler < DummyEntityCreatedIntegrationEvent >
{
private readonly ILogger < DummyEntityCreatedIntegrationEventHandlerSub > _logger ;
public DummyEntityCreatedIntegrationEventHandlerSub (
ILogger < DummyEntityCreatedIntegrationEventHandlerSub > logger )
{
_logger = logger ?? throw new ArgumentNullException ( nameof ( logger ));
}
public Task Handle ( DummyEntityCreatedIntegrationEvent @event )
{
_logger . LogInformation (
"Handling integration event: {EventId} - ({EventType})" ,
@event . Id ,
@event . EventType );
// Process the event
var data = @event . Data ;
// ... business logic ...
return Task . CompletedTask ;
}
}
Register Subscribers
Application/Registrations/ApplicationServicesRegistration.cs
private static IServiceCollection AddSubscribers ( this IServiceCollection services )
{
services . AddTransient < DummyEntityCreatedIntegrationEventHandlerSub >();
return services ;
}
Subscribe at Startup
Configure subscriptions during application startup:
public void Configure ( IApplicationBuilder app , IWebHostEnvironment env )
{
// ... other configuration ...
UseEventBus ( app );
// ... rest of configuration ...
}
private void UseEventBus ( IApplicationBuilder app )
{
var eventBus = app . ApplicationServices . GetRequiredService < IEventBus >();
// Subscribe to events
eventBus . Subscribe < DummyEntityCreatedIntegrationEvent ,
DummyEntityCreatedIntegrationEventHandlerSub > ();
}
Usage Example
Publish an event from your command handler:
public class CreateDummyEntityHandler : IRequestCommandHandler < CreateDummyEntityCommand , string >
{
private readonly ICommandQueryBus _domainBus ;
private readonly IDummyEntityRepository _repository ;
public async Task < string > Handle ( CreateDummyEntityCommand request , CancellationToken cancellationToken )
{
var entity = new DummyEntity ( request . dummyPropertyOne , request . dummyPropertyTwo );
object createdId = await _repository . AddAsync ( entity );
// Publish domain event (which triggers integration event publisher)
await _domainBus . Publish ( entity . To < DummyEntityCreated >(), cancellationToken );
return createdId . ToString ();
}
}
Best Practices
Keep integration events immutable and serializable. Use DTOs for the Data property.
Always handle exceptions in event handlers to prevent blocking the message queue. Log errors and implement dead-letter queue handling.
Integration events represent facts that have already occurred. Use past tense naming (e.g., OrderCreated, not CreateOrder).
Monitoring
The RabbitMQ implementation includes comprehensive logging:
// Logs when publishing
_logger . LogTrace ( "Publishing event to RabbitMQ: {EventId}" , @event . Id );
// Logs on retry
_logger . LogWarning (
"Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})" ,
@event . Id ,
time . TotalSeconds ,
ex . Message );
// Logs circuit breaker state
_logger . LogError (
"[CircuitBreaker] Circuito ABIERTO durante {Duration}s" ,
timespan . TotalSeconds );
Monitor your RabbitMQ management console to track message rates, queue depths, and consumer performance.