Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/microservices-patterns/ftgo-application/llms.txt

Use this file to discover all available pages before exploring further.

This page covers how the FTGO application implements the event sourcing pattern in the Accounting Service. Instead of persisting the current state of an Account record in a relational table, every state change is stored as an immutable domain event. The current account state is reconstructed by replaying those events.

What is Event Sourcing

In a conventional application an UPDATE statement overwrites the previous state of a row. Event sourcing replaces that mutable row with an append-only sequence of events. The current state is derived by folding all past events:
current state = fold(initial state, [event1, event2, ...eventN])
This gives a complete audit trail, enables time-travel queries, and makes it straightforward to rebuild read models by replaying the event log.
FTGO uses the Eventuate Client framework for event sourcing. Aggregates extend ReflectiveMutableCommandProcessingAggregate, which routes command messages to process() methods and state-change events to apply() methods.

The Account Aggregate

Account is the event-sourced aggregate in the Accounting Service. It extends ReflectiveMutableCommandProcessingAggregate<Account, AccountCommand>, meaning the framework automatically dispatches:
  • Incoming commands to the matching process(SomeCommand) method, which returns a list of events to persist.
  • Persisted events back to the matching apply(SomeEvent) method, which mutates the in-memory aggregate state.
// ftgo-accounting-service/.../domain/Account.java
public class Account
    extends ReflectiveMutableCommandProcessingAggregate<Account, AccountCommand> {

  // Command: create a new account
  public List<Event> process(CreateAccountCommand command) {
    return events(new AccountCreatedEvent());
  }

  public void apply(AccountCreatedEvent event) {
    // initialize account state
  }

  // Command: authorize a payment
  public List<Event> process(AuthorizeCommandInternal command) {
    return events(new AccountAuthorizedEvent());
  }

  public void apply(AccountAuthorizedEvent event) {
    // record the authorization
  }

  // Command: reverse an authorization (e.g., during CancelOrderSaga)
  public List<Event> process(ReverseAuthorizationCommandInternal command) {
    return Collections.emptyList();
  }

  // Command: revise an authorization (e.g., during ReviseOrderSaga)
  public List<Event> process(ReviseAuthorizationCommandInternal command) {
    return Collections.emptyList();
  }

  // Required by Eventuate Tram Sagas event-sourcing support
  public void apply(SagaReplyRequestedEvent event) {
    // no-op — framework bookkeeping
  }
}

Events

EventProduced byMeaning
AccountCreatedEventprocess(CreateAccountCommand)A new consumer account was opened
AccountAuthorizedEventprocess(AuthorizeCommandInternal)A payment was successfully authorized
ReverseAuthorizationCommandInternal and ReviseAuthorizationCommandInternal currently return an empty list, meaning they have side effects tracked elsewhere but do not produce new domain events in this implementation.

Creating an Account

AccountingService uses AggregateRepository.save() to persist the initial CreateAccountCommand. The framework calls process(CreateAccountCommand), persists the returned AccountCreatedEvent, then calls apply(AccountCreatedEvent) to hydrate the new aggregate instance.
// AccountingService.java
public class AccountingService {

  @Autowired
  private AggregateRepository<Account, AccountCommand> accountRepository;

  public void create(String aggregateId) {
    EntityWithIdAndVersion<Account> account =
        accountRepository.save(
            new CreateAccountCommand(),
            Optional.of(new SaveOptions().withId(aggregateId)));
  }
}
A consumer account is created automatically when a ConsumerCreated event is observed by AccountingEventConsumer:
// AccountingEventConsumer.java
public DomainEventHandlers domainEventHandlers() {
  return DomainEventHandlersBuilder
      .forAggregateType(
          "net.chrisrichardson.ftgo.consumerservice.domain.Consumer")
      .onEvent(ConsumerCreated.class, this::createAccount)
      .build();
}

private void createAccount(DomainEventEnvelope<ConsumerCreated> dee) {
  accountingService.create(dee.getAggregateId());
}
The aggregateId passed to save() matches the consumer’s ID, so the account and consumer share the same identifier.

Processing Saga Commands

AccountingServiceCommandHandler handles commands that arrive from the saga orchestrator. Each handler calls accountRepository.update(), which loads the aggregate (by replaying its events), calls the appropriate process() method, persists any new events, and sends a reply back to the saga.
// AccountingServiceCommandHandler.java
public CommandHandlers commandHandlers() {
  return SagaCommandHandlersBuilder
      .fromChannel("accountingService")
      .onMessage(AuthorizeCommand.class,        this::authorize)
      .onMessage(ReverseAuthorizationCommand.class, this::reverseAuthorization)
      .onMessage(ReviseAuthorization.class,     this::reviseAuthorization)
      .build();
}

public void authorize(CommandMessage<AuthorizeCommand> cm) {
  AuthorizeCommand command = cm.getCommand();
  accountRepository.update(
      Long.toString(command.getConsumerId()),
      makeAuthorizeCommandInternal(command),
      replyingTo(cm)
          .catching(AccountDisabledException.class,
              () -> withFailure(new AccountDisabledReply()))
          .build());
}
The replyingTo(cm) option tells Eventuate to send a success or failure reply back to the saga reply channel once the event has been durably persisted.
If AccountDisabledException is thrown during command processing, withFailure(new AccountDisabledReply()) is sent back to the saga, which triggers compensation for all preceding steps.

How Eventuate Client Works

1

Command arrives on the messaging channel

The saga orchestrator sends an AuthorizeCommand (or similar) to the accountingService Kafka topic.
2

Command handler loads the aggregate

AggregateRepository.update() queries the Eventuate event store for all events with the given aggregate ID and replays them to reconstruct the current Account state.
3

process() returns new events

The framework calls account.process(AuthorizeCommandInternal), which returns [AccountAuthorizedEvent].
4

Events are persisted atomically

The new events are appended to the event store in a single transaction. Because the event store is a database, this write is atomic and forms the basis of transactional messaging.
5

apply() updates in-memory state

account.apply(AccountAuthorizedEvent) is called to bring the in-memory aggregate up to date.
6

Reply is sent to the saga

Eventuate publishes a success reply to the saga’s reply channel so CreateOrderSaga (or whichever saga triggered the command) can advance to its next step.

Event Store vs Relational Store

A conventional JPA entity would have an accounts table with columns for balance, status, etc. An UPDATE would overwrite the previous row, and the history would be lost.With Eventuate event sourcing, the events table stores rows like:
aggregate_typeaggregate_idevent_typeevent_data
Account42AccountCreatedEvent{}
Account42AccountAuthorizedEvent{"orderId":"101","amount":"25.00"}
Each row is immutable. Reconstructing the account always starts from the first event and applies each in sequence. This guarantees a complete and auditable history of every authorization, reversal, and revision.

Build docs developers (and LLMs) love