Documentation Index
Fetch the complete documentation index at: https://mintlify.com/jordiaragonzaragoza/JordiAragonZaragoza.SharedKernel/llms.txt
Use this file to discover all available pages before exploring further.
Event sourcing stores the full history of an aggregate as an ordered sequence of immutable domain events, rather than a single latest-state snapshot. SharedKernel’s Infrastructure.EventStore package provides a first-class integration with KurrentDB (the successor to EventStoreDB), including aggregate load/save, stream naming conventions, event type mapping, and durable catch-up subscriptions that drive read-model projections.
IEventStore
IEventStore extends IAggregateStore with three operations that form the event sourcing contract:
public interface IEventStore : IAggregateStore
{
/// <summary>
/// Stages an aggregate's uncommitted events for the next SaveChangesAsync call.
/// </summary>
void AppendChanges<TAggregate, TId>(TAggregate aggregate)
where TAggregate : class, IEventSourcedAggregateRoot<TId>
where TId : class, IEntityId;
/// <summary>
/// Reconstructs an aggregate by replaying all events from its stream.
/// Returns null if the stream does not exist.
/// </summary>
Task<TAggregate?> LoadAggregateAsync<TAggregate, TId>(
TId aggregateId,
CancellationToken cancellationToken = default)
where TAggregate : class, IEventSourcedAggregateRoot<TId>
where TId : class, IEntityId;
/// <summary>
/// Writes all staged changes to KurrentDB.
/// </summary>
Task SaveChangesAsync(CancellationToken cancellationToken = default);
}
KurrentDbEventStore
KurrentDbEventStore is the production implementation of IEventStore and IUnitOfWork. It maintains an in-process list of aggregates with pending changes (pendingChanges) that are flushed to KurrentDB when SaveChangesAsync (or ExecuteInTransactionAsync) is called.
public class KurrentDbEventStore : IEventStore, IUnitOfWork
{
private readonly List<IEventSourcedAggregateRoot<IEntityId>> pendingChanges = [];
private readonly KurrentDBClient eventStoreClient;
private readonly ILogger<KurrentDbEventStore> logger;
public KurrentDbEventStore(
KurrentDBClient eventStoreClient,
ILogger<KurrentDbEventStore> logger) { ... }
// Exposes staged aggregates so domain event dispatchers can fan out events
public IEnumerable<IEventsContainer<IEvent>> EventableEntities
=> this.pendingChanges.AsReadOnly();
public async Task<TAggregate?> LoadAggregateAsync<TAggregate, TId>(
TId aggregateId, CancellationToken cancellationToken = default)
{
var readResult = this.eventStoreClient.ReadStreamAsync(
Direction.Forwards,
StreamNameMapper.ToStreamId<TAggregate>(aggregateId),
StreamPosition.Start,
cancellationToken: cancellationToken);
if (await readResult.ReadState == ReadState.StreamNotFound)
return null;
var aggregate = Activator.CreateInstance(typeof(TAggregate), true) as TAggregate;
var domainEvents = new List<IDomainEvent>();
await foreach (var resolvedEvent in readResult)
domainEvents.Add(SerializerHelper.Deserialize(resolvedEvent));
aggregate?.Load(domainEvents);
return aggregate;
}
public void AppendChanges<TAggregate, TId>(TAggregate aggregate)
=> this.pendingChanges.Add(aggregate);
public async Task SaveChangesAsync(CancellationToken cancellationToken = default)
{
foreach (var aggregateToSave in this.pendingChanges)
await this.StoreAsync(aggregateToSave, cancellationToken);
this.pendingChanges.Clear();
}
public async Task<TResponse> ExecuteInTransactionAsync<TResponse>(
Func<Task<TResponse>> operation,
CancellationToken cancellationToken = default)
where TResponse : IResult
{
var response = await operation();
var isSuccess = (bool)(typeof(TResponse)
.GetProperty("IsSuccess")?.GetValue(response, null) ?? false);
if (isSuccess)
await this.SaveChangesAsync(cancellationToken);
return response;
}
}
When writing to a stream, KurrentDbEventStore uses the aggregate’s Version to set the optimistic-concurrency expectedState:
null version → StreamState.NoStream (first write).
- Non-null version →
StreamState.StreamRevision((ulong)aggregate.Version).
KurrentDbOptions
Connection configuration lives under the "EventStore" configuration section:
public sealed class KurrentDbOptions
{
public const string Section = "EventStore";
public string ConnectionString { get; set; } = default!;
}
In appsettings.json:
{
"EventStore": {
"ConnectionString": "esdb://localhost:2113?tls=false"
}
}
Bind the options and create the KurrentDBClient in your composition root:
builder.Services.Configure<KurrentDbOptions>(
builder.Configuration.GetSection(KurrentDbOptions.Section));
builder.Services.AddSingleton(_ =>
{
var options = builder.Configuration
.GetSection(KurrentDbOptions.Section)
.Get<KurrentDbOptions>()!;
var settings = KurrentDBClientSettings.Create(options.ConnectionString);
return new KurrentDBClient(settings);
});
DI Registration
The ConfigureServices class in the EventStore package exposes two extension methods:
public static class ConfigureServices
{
// Registers KurrentDbEventStore as IEventStore, IAggregateStore, and IUnitOfWork (all Scoped)
public static IServiceCollection AddSharedKernelInfrastructureKurrentDbBusiness(
this IServiceCollection serviceCollection)
{
serviceCollection.AddScoped<KurrentDbEventStore>();
serviceCollection.AddScoped<IEventStore>(
sp => sp.GetRequiredService<KurrentDbEventStore>());
serviceCollection.AddScoped<IAggregateStore>(
sp => sp.GetRequiredService<KurrentDbEventStore>());
serviceCollection.AddScoped<IUnitOfWork>(
sp => sp.GetRequiredService<KurrentDbEventStore>());
serviceCollection.AddSingleton(EventTypeMapper.Instance);
return serviceCollection;
}
// Registers the catch-up subscription background worker
public static IServiceCollection AddSharedKernelInfrastructureKurrentDbAllStreamSubscription(
this IServiceCollection serviceCollection)
{
serviceCollection.AddTransient<KurrentDbAllStreamSubscription>();
serviceCollection.AddSingleton(new CancellationTokenSource());
return serviceCollection.AddHostedService(serviceProvider =>
{
var logger = serviceProvider
.GetRequiredService<ILogger<AllStreamSubscriptionBackgroundWorker>>();
var subscription = serviceProvider
.GetRequiredService<KurrentDbAllStreamSubscription>();
var cts = serviceProvider.GetRequiredService<CancellationTokenSource>();
return new AllStreamSubscriptionBackgroundWorker(
logger,
cancellationToken =>
subscription.SubscribeToAllAsync(
new KurrentDbAllStreamSubscriptionOptions(),
cts.Token));
});
}
}
StreamNameMapper
StreamNameMapper converts aggregate CLR types to KurrentDB stream names using the conventional {category}-{tenantId_}{aggregateId} format:
// Register a custom stream name for a type (optional)
StreamNameMapper.AddCustomMap<Order>("ordering_Order");
// Generate a stream prefix (category) from a type
// e.g. "Ordering_Order" for namespace "MyApp.Ordering.Domain"
string prefix = StreamNameMapper.ToStreamPrefix<Order>();
// Generate a full stream ID
// e.g. "Ordering_Order-acme_550e8400-e29b-41d4-a716-446655440000"
string streamId = StreamNameMapper.ToStreamId<Order>(orderId, tenantId: "acme");
The default prefix logic takes the third namespace segment (index 2) as the module name, so MyApp.Ordering.Domain.Order becomes Ordering_Order. Register a custom map if you need a different name.
EventTypeMapper
EventTypeMapper maintains a bidirectional mapping between CLR event types and their string names, used during serialization and deserialization:
// Access the singleton instance
var mapper = EventTypeMapper.Instance;
// Register a custom name (e.g., to handle renamed event types)
mapper.AddCustomMap<OrderPlaced>("ordering.order_placed");
// Get the string name for a CLR type
string name = mapper.ToName<OrderPlaced>();
// → "ordering.order_placed" (or the full type name if not mapped)
// Resolve a CLR type from a stored string name
Type eventType = mapper.ToType("ordering.order_placed");
If no custom mapping exists, ToName falls back to Type.FullName, and ToType uses assembly scanning via TypeHelper.GetFirstMatchingTypeFromCurrentDomainAssembly.
Catch-Up Subscriptions
Catch-up subscriptions replay all historical events from the $all stream and then continue processing new events as they arrive. SharedKernel models this as a hosted background service.
KurrentDbAllStreamSubscriptionOptions
Configuration object for the subscription:
public class KurrentDbAllStreamSubscriptionOptions
{
// A stable GUID that identifies this subscription's checkpoint row
public Guid SubscriptionId { get; set; } =
new Guid("cbbaeb7e-a087-44cc-75a0-08dc80991837");
// By default, excludes KurrentDB system events (e.g., $>)
public SubscriptionFilterOptions FilterOptions { get; set; } =
new(EventTypeFilter.ExcludeSystemEvents());
public Action<KurrentDBClientOperationOptions>? ConfigureOperation { get; set; }
public UserCredentials? Credentials { get; set; }
public bool ResolveLinkTos { get; set; }
public bool IgnoreDeserializationErrors { get; set; } = true;
}
KurrentDbAllStreamSubscription
Manages the persistent connection to KurrentDB. On startup, it reads the last checkpointed Position from the repository and resumes from that position (or from FromAll.Start for new subscriptions).
public async Task SubscribeToAllAsync(
KurrentDbAllStreamSubscriptionOptions subscriptionOptions,
CancellationToken cancellationToken)
{
// Load the checkpoint for this subscription
var checkpoint = await checkpointRepository
.GetByIdAsync(this.SubscriptionId, cancellationToken);
await using var subscription = this.kurrentDbClient.SubscribeToAll(
checkpoint == null
? FromAll.Start
: FromAll.After(new Position(checkpoint.Position, checkpoint.Position)),
subscriptionOptions.ResolveLinkTos,
subscriptionOptions.FilterOptions,
subscriptionOptions.Credentials,
cancellationToken);
// Processes both catch-up (historical) and live events in sequence
await foreach (var message in subscription.Messages
.WithCancellation(cancellationToken))
{
await this.HandleMessageAsync(message, checkpointRepository, cancellationToken);
}
}
For each StreamMessage.Event, it deserializes the event, publishes it via IEventBus, and updates the Checkpoint row inside a IUnitOfWork transaction.
AllStreamSubscriptionBackgroundWorker
A BackgroundService that drives KurrentDbAllStreamSubscription from the .NET hosted services infrastructure:
public sealed class AllStreamSubscriptionBackgroundWorker : BackgroundService
{
protected override Task ExecuteAsync(CancellationToken stoppingToken)
=> Task.Run(async () =>
{
await Task.Yield(); // yield to avoid blocking startup
await this.perform(stoppingToken);
}, stoppingToken);
}
Checkpoint
Checkpoint is the read model record that tracks the last committed Position for each subscription, enabling reliable resume after restarts:
public sealed record class Checkpoint : IReadModel
{
public Checkpoint(Guid id, ulong position, DateTimeOffset checkpointedAtOnUtc) { ... }
public Guid Id { get; } // Matches KurrentDbAllStreamSubscriptionOptions.SubscriptionId
public ulong Position { get; set; }
public DateTimeOffset CheckpointedAtOnUtc { get; set; }
public uint Version { get; set; }
}
Each logical subscription (e.g., one per read-model context) should have its own unique SubscriptionId GUID in KurrentDbAllStreamSubscriptionOptions. The Checkpoint row for that GUID is persisted inside the read-model database via a BaseReadModelContext-derived context. If you have multiple projection contexts, each should register a separate AddSharedKernelInfrastructureKurrentDbAllStreamSubscription call with a distinct SubscriptionId and its own IUnitOfWork binding.
Event-Store Repository Base Class
BaseRepository<TAggregate, TId> (in the EventStore package, not the EF package) wraps IEventStore and implements IRepository<TAggregate, TId>. It caches the most recently loaded aggregate per repository instance and delegates all writes to IEventStore.AppendChanges:
public abstract class BaseRepository<TAggregate, TId> : IRepository<TAggregate, TId>
where TAggregate : BaseEventSourcedAggregateRoot<TId>
where TId : class, IEntityId
{
private readonly IEventStore eventStore;
private TAggregate? currentAggregate;
protected BaseRepository(IEventStore eventStore)
=> this.eventStore = eventStore
?? throw new ArgumentNullException(nameof(eventStore));
public async Task<TAggregate?> GetByIdAsync(
TId id, CancellationToken cancellationToken = default)
{
if (this.currentAggregate is not null)
return this.currentAggregate;
this.currentAggregate = await this.eventStore
.LoadAggregateAsync<TAggregate, TId>(id, cancellationToken);
return this.currentAggregate;
}
public Task<TAggregate> AddAsync(TAggregate model, CancellationToken cancellationToken = default)
=> Task.FromResult(this.Store(model));
public Task<int> UpdateAsync(TAggregate model, CancellationToken cancellationToken = default)
{
this.Store(model);
return Task.FromResult(1);
}
public Task<int> DeleteAsync(TAggregate model, CancellationToken cancellationToken = default)
{
this.Store(model);
return Task.FromResult(1);
}
private TAggregate Store(TAggregate aggregate)
{
this.eventStore.AppendChanges<TAggregate, TId>(aggregate);
return aggregate;
}
}
Complete Example
Create an event-sourced aggregate repository
public class OrderRepository
: BaseRepository<Order, OrderId>, IOrderRepository
{
public OrderRepository(IEventStore eventStore)
: base(eventStore) { }
}
Register everything in Program.cs
// KurrentDB client
builder.Services.AddSingleton(_ =>
{
var cs = builder.Configuration["EventStore:ConnectionString"]!;
return new KurrentDBClient(KurrentDBClientSettings.Create(cs));
});
// SharedKernel event store (IEventStore, IUnitOfWork, IAggregateStore)
builder.Services.AddSharedKernelInfrastructureKurrentDbBusiness();
// Catch-up subscription background worker
builder.Services.AddSharedKernelInfrastructureKurrentDbAllStreamSubscription();
// Your own event-sourced repository
builder.Services.AddScoped<IOrderRepository, OrderRepository>();
Load, modify, and save an aggregate
public class PlaceOrderCommandHandler : ICommandHandler<PlaceOrderCommand>
{
private readonly IOrderRepository repository;
public PlaceOrderCommandHandler(IOrderRepository repository)
=> this.repository = repository;
public async Task<Result> Handle(
PlaceOrderCommand command,
CancellationToken cancellationToken)
{
// Load aggregate by replaying its event stream
var order = await this.repository.GetByIdAsync(
new OrderId(command.OrderId), cancellationToken);
if (order is null)
{
// Create a brand-new aggregate; its ctor emits a domain event
order = Order.Create(command.OrderId, command.CustomerId);
await this.repository.AddAsync(order, cancellationToken);
}
else
{
order.AddItem(command.ProductId, command.Quantity);
await this.repository.UpdateAsync(order, cancellationToken);
}
// IUnitOfWork.ExecuteInTransactionAsync (from UnitOfWorkBehaviour)
// calls IEventStore.SaveChangesAsync after the handler returns.
return Result.Success();
}
}