Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/microservices-patterns/ftgo-application/llms.txt

Use this file to discover all available pages before exploring further.

This page describes how the FTGO application publishes and consumes messages reliably across microservices. The core challenge is that a service must persist a state change and publish a message in the same atomic operation — if these two steps are not atomic, a crash between them leaves the system inconsistent. FTGO solves this using the transactional outbox pattern implemented by the Eventuate Tram framework.

The Problem: Dual Writes

A service that writes to its database and then publishes to a message broker has two independent I/O operations. A process crash or network failure between them produces a state where the database update succeeded but the message was never sent (or vice versa). Downstream services never learn about the change, or they process a phantom event.
Do not use “fire and forget” event publishing after a database commit. If the application crashes between the commit and the send() call, the event is permanently lost and the system becomes inconsistent.

The Outbox Pattern

Instead of publishing directly to Kafka, the service writes the event into an OUTBOX table in the same local database transaction as the business entity change. A separate CDC process reads new rows from that table (via the MySQL binary log) and forwards them to Kafka. Because the outbox write and the business write share a transaction, they are atomic.
Service (MySQL transaction)
  ├── UPDATE orders SET state = 'APPROVED'
  └── INSERT INTO outbox (topic, payload) VALUES ('order', '{"type":"OrderAuthorized",...}')

         ↓ MySQL binlog
CDC Process (Eventuate)
  └── Reads outbox rows → publishes to Kafka

         ↓ Kafka
Subscriber Services
  └── OrderHistoryEventHandlers, AccountingEventConsumer, ...

Publishing Domain Events with Eventuate Tram

The Order Service publishes events through OrderDomainEventPublisher, a thin wrapper over the Eventuate Tram DomainEventPublisher:
// OrderDomainEventPublisher.java
public class OrderDomainEventPublisher
    extends AbstractAggregateDomainEventPublisher<Order, OrderDomainEvent> {

  public OrderDomainEventPublisher(DomainEventPublisher eventPublisher) {
    super(eventPublisher, Order.class, Order::getId);
  }
}
When Order.createOrder() returns a ResultWithDomainEvents, the service layer passes the events to this publisher, which writes them transactionally:
// Order.java
public static ResultWithDomainEvents<Order, OrderDomainEvent> createOrder(
    long consumerId,
    Restaurant restaurant,
    DeliveryInformation deliveryInformation,
    List<OrderLineItem> orderLineItems) {

  Order order = new Order(consumerId, restaurant.getId(),
                          deliveryInformation, orderLineItems);
  List<OrderDomainEvent> events = singletonList(
      new OrderCreatedEvent(
          new OrderDetails(consumerId, restaurant.getId(),
                           orderLineItems, order.getOrderTotal()),
          deliveryInformation.getDeliveryAddress(),
          restaurant.getName()));
  return new ResultWithDomainEvents<>(order, events);
}

Domain Events Published by the Order Service

EventPublished when
OrderCreatedEventA new order is placed (APPROVAL_PENDING)
OrderAuthorizedThe CreateOrderSaga completes successfully
OrderRejectedThe CreateOrderSaga compensation runs
OrderCancelledThe CancelOrderSaga completes

Subscribing to Domain Events

Services that need to react to events implement a DomainEventHandlers bean using DomainEventHandlersBuilder.

Order History Service

// OrderHistoryEventHandlers.java
public DomainEventHandlers domainEventHandlers() {
  return DomainEventHandlersBuilder
      .forAggregateType(
          "net.chrisrichardson.ftgo.orderservice.domain.Order")
      .onEvent(OrderCreatedEvent.class, this::handleOrderCreated)
      .onEvent(OrderAuthorized.class,   this::handleOrderAuthorized)
      .onEvent(OrderCancelled.class,    this::handleOrderCancelled)
      .onEvent(OrderRejected.class,     this::handleOrderRejected)
      .build();
}

Accounting Service

The Accounting Service subscribes to ConsumerCreated events from the Consumer Service to create a corresponding account for each new consumer:
// AccountingEventConsumer.java
public DomainEventHandlers domainEventHandlers() {
  return DomainEventHandlersBuilder
      .forAggregateType(
          "net.chrisrichardson.ftgo.consumerservice.domain.Consumer")
      .onEvent(ConsumerCreated.class, this::createAccount)
      .build();
}

private void createAccount(DomainEventEnvelope<ConsumerCreated> dee) {
  accountingService.create(dee.getAggregateId());
}

Saga Command Messages

The saga orchestrator sends commands using CommandWithDestinationBuilder. These are also written to the outbox transactionally and forwarded to the target service’s Kafka topic.
// CancelOrderSaga.java — sending a command to Accounting Service
private CommandWithDestination reverseAuthorization(
    CancelOrderSagaData data) {
  return send(new ReverseAuthorizationCommand(
                  data.getConsumerId(),
                  data.getOrderId(),
                  data.getOrderTotal()))
      .to(AccountingServiceChannels.accountingServiceChannel)
      .build();
}

Participant Command Handlers

Each participating service declares a CommandHandlers bean using SagaCommandHandlersBuilder. The framework routes incoming messages from the named channel to the correct handler method.
// AccountingServiceCommandHandler.java
public CommandHandlers commandHandlers() {
  return SagaCommandHandlersBuilder
      .fromChannel("accountingService")
      .onMessage(AuthorizeCommand.class,        this::authorize)
      .onMessage(ReverseAuthorizationCommand.class, this::reverseAuthorization)
      .onMessage(ReviseAuthorization.class,     this::reviseAuthorization)
      .build();
}
// ConsumerServiceCommandHandlers.java
public CommandHandlers commandHandlers() {
  return SagaCommandHandlersBuilder
      .fromChannel("consumerService")
      .onMessage(ValidateOrderByConsumer.class, this::validateOrderForConsumer)
      .build();
}

private Message validateOrderForConsumer(
    CommandMessage<ValidateOrderByConsumer> cm) {
  try {
    consumerService.validateOrderForConsumer(
        cm.getCommand().getConsumerId(),
        cm.getCommand().getOrderTotal());
    return withSuccess();
  } catch (ConsumerVerificationFailedException e) {
    return withFailure();
  }
}
// KitchenServiceCommandHandler.java
public CommandHandlers commandHandlers() {
  return SagaCommandHandlersBuilder
      .fromChannel(KitchenServiceChannels.COMMAND_CHANNEL)
      .onMessage(CreateTicket.class,            this::createTicket)
      .onMessage(ConfirmCreateTicket.class,     this::confirmCreateTicket)
      .onMessage(CancelCreateTicket.class,      this::cancelCreateTicket)
      .onMessage(BeginCancelTicketCommand.class,   this::beginCancelTicket)
      .onMessage(ConfirmCancelTicketCommand.class, this::confirmCancelTicket)
      .onMessage(UndoBeginCancelTicketCommand.class, this::undoBeginCancelTicket)
      .onMessage(BeginReviseTicketCommand.class,   this::beginReviseTicket)
      .onMessage(UndoBeginReviseTicketCommand.class, this::undoBeginReviseTicket)
      .onMessage(ConfirmReviseTicketCommand.class, this::confirmReviseTicket)
      .build();
}

Full Messaging Flow

1

Business logic runs inside a local transaction

The service writes its entity (e.g., creates an Order row) and Eventuate Tram appends an event or command record to the outbox table — both in the same MySQL transaction.
2

CDC service reads the MySQL binlog

The Eventuate CDC service (deployed as a separate container) tails the MySQL binary log and reads committed outbox rows as soon as they appear.
3

Messages are published to Kafka

The CDC service produces each outbox row as a Kafka message to the appropriate topic (e.g., net.chrisrichardson.ftgo.orderservice.domain.Order for Order domain events, accountingService for accounting commands).
4

Subscriber consumes the Kafka message

Each subscribing service’s Eventuate Tram consumer polls its Kafka topic, deserializes the event or command, and calls the registered handler method.
5

Handler processes the message and replies

For saga commands, the handler sends a success or failure reply back to the saga reply channel. For domain events, the handler updates the local read model or triggers further business logic.

Infrastructure Components

ComponentTechnologyRole
Outbox tableMySQLStores events/commands atomically with business data
CDC serviceEventuate CDCReads MySQL binlog, publishes to Kafka
Message brokerApache KafkaDurable, ordered delivery between services
Consumer offset trackingKafka consumer groupsAt-least-once delivery; handlers must be idempotent
The CDC service can publish the same message more than once if it crashes after publishing but before recording its position in the binlog. Handlers must therefore be idempotent.The Order History Service handles this explicitly: every DynamoDB write includes the source event’s aggregateType, aggregateId, and eventId as a condition expression. If the same event arrives a second time, ConditionalCheckFailedException is caught and the write is skipped.
// OrderHistoryDaoDynamoDb.java
private boolean idempotentUpdate(UpdateItemSpec spec,
                                  Optional<SourceEvent> eventSource) {
  try {
    table.updateItem(
        eventSource.map(es -> es.addDuplicateDetection(spec))
                   .orElse(spec));
    return true;
  } catch (ConditionalCheckFailedException e) {
    logger.error("not updated {}", eventSource);
    return false;
  }
}

Build docs developers (and LLMs) love