DEV Community

Cover image for Implementando o padrão Outbox Pattern com MySQL e usando Brighter
Rafael Andrade
Rafael Andrade

Posted on

Implementando o padrão Outbox Pattern com MySQL e usando Brighter

Introdução

Este guia demonstra como implementar o Padrão de Caixa de Saída (Outbox Pattern) com MySQL e .NET 8 usando a biblioteca Brighter para garantir consistência transacional entre atualizações de banco de dados e publicação de mensagens.

Projeto

O objetivo é processar um comando CreateNewOrder que publique dois eventos (OrderPlaced, OrderPaid) somente se a transação for bem-sucedida. Se ocorrer um erro (ex.: violação de regra de negócio), ambas as alterações no banco de dados e publicações de mensagens serão revertidas.

Requisitos

Mensagens

Para este projeto, usaremos estas 3 mensagens: CreateNewOrder, OrderPlaced e OrderPaid

public class CreateNewOrder() : Command(Guid.NewGuid()) { public decimal Value { get; set; } } public class OrderPlaced() : Event(Guid.NewGuid()) { public string OrderId { get; set; } = string.Empty; public decimal Value { get; set; } } public class OrderPaid() : Event(Guid.NewGuid()) { public string OrderId { get; set; } = string.Empty; } 
Enter fullscreen mode Exit fullscreen mode

Mapeadores de Mensagens

Como apenas os eventos OrderPlaced e OrderPaid são publicados no RabbitMQ, precisamos implementar mapeadores para eles usando serialização JSON

public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced> { public Message MapToMessage(OrderPlaced request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "order-placed"; header.MessageType = MessageType.MT_EVENT; var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); } public OrderPlaced MapToRequest(Message message) { return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!; } } public class OrderPaidMapper : IAmAMessageMapper<OrderPaid> { public Message MapToMessage(OrderPaid request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "order-paid"; header.MessageType = MessageType.MT_EVENT; var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); } public OrderPaid MapToRequest(Message message) { return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!; } } 
Enter fullscreen mode Exit fullscreen mode

Manipuladores de Requisições

Para OrderPlaced e OrderPaid vamos registrar logs da mensagem recebida.

public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced> { public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default) { logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value); return base.HandleAsync(command, cancellationToken); } } public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid> { public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default) { logger.LogInformation("{OrderId} paid", command.OrderId); return base.HandleAsync(command, cancellationToken); } } 
Enter fullscreen mode Exit fullscreen mode

Criar Novo Pedido

O manipulador CreateNewOrder vai esperar 10ms para emular um processo, depois publica o OrderPlaced, se o valor for divisível por 3 lança uma exceção (simulando um erro de negócio), caso contrário publica OrderPaid.

public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor, IUnitOfWork unitOfWork, ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder> { public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default) { await unitOfWork.BeginTransactionAsync(cancellationToken); try { string id = Guid.NewGuid().ToString(); logger.LogInformation("Creating a new order: {OrderId}", id); await Task.Delay(10, cancellationToken); // emulando um processo _ = await commandProcessor.DepositPostAsync(new OrderPlaced { OrderId = id, Value = command.Value }, cancellationToken: cancellationToken); if (command.Value % 3 == 0) { throw new InvalidOperationException("invalid value"); } _ = await commandProcessor.DepositPostAsync(new OrderPaid { OrderId = id }, cancellationToken: cancellationToken); await unitOfWork.CommitAsync(cancellationToken); return await base.HandleAsync(command, cancellationToken); } catch { logger.LogError("Invalid data"); await unitOfWork.RollbackAsync(cancellationToken); throw; } } } 
Enter fullscreen mode Exit fullscreen mode

Entendimento Chave:

  • DepositPostAsync armazena mensagens na caixa de saída dentro da mesma transação dos dados de negócio.

  • Se ocorrer uma exceção (ex.: InvalidOperationException), a transação é revertida, garantindo que não haja mensagens órfãs.

Configurando MySQL

Para integrar o Padrão de Caixa de Saída com MySQL, primeiro garanta que a tabela outbox_messages exista.

1. Esquema da Tabela SQL

CREATE TABLE IF NOT EXISTS `outbox_messages`( `MessageId` CHAR(36) NOT NULL, `Topic` VARCHAR(255) NOT NULL, `MessageType` VARCHAR(32) NOT NULL, `Timestamp` TIMESTAMP(3) NOT NULL, `CorrelationId` CHAR(36) NULL, `ReplyTo` VARCHAR(255) NULL, `ContentType` VARCHAR(128) NULL, `Dispatched` TIMESTAMP(3) NULL, `HeaderBag` TEXT NOT NULL, `Body` TEXT NOT NULL, `Created` TIMESTAMP(3) NOT NULL DEFAULT NOW(3), `CreatedID` INT(11) NOT NULL AUTO_INCREMENT, UNIQUE(`CreatedID`), PRIMARY KEY (`MessageId`) ); 
Enter fullscreen mode Exit fullscreen mode

2. Configuração de Injeção de Dependência

Registre a caixa de saída e transação.

services .AddServiceActivator(opt => { /* Configuração de assinatura (ver artigo anterior) */ }) .UseMySqlOutbox(new MySqlConfiguration(ConnectionString, "outbox_messages"), typeof(MySqlConnectionProvider), ServiceLifetime.Scoped)) .UseMySqTransactionConnectionProvider(typeof(MySqlConnectionProvider)) .UseOutboxSweeper(opt => opt.BatchSize = 10); 
Enter fullscreen mode Exit fullscreen mode

Por Que Funciona:

  • UseMySqlOutbox vincula a caixa de saída ao SQL Server.
  • UseOutboxSweeper configura a verificação em segundo plano para mensagens não entregues.

3. Gestão de Transações

Para garantir atomicidade entre lógica de negócio e publicação de mensagens na Brighter, implemente IMySqlTransactionConnectionProvider e IUnitOfWork para compartilhar o contexto de transação. Isso garante que mensagens só sejam armazenadas na caixa de saída se a transação do banco de dados for confirmada com sucesso.

a. SqlConnectionProvider
public class MySqlConnectionProvider(MySqlUnitOfWork sqlConnection) : IMySqlTransactionConnectionProvider { private readonly MySqlUnitOfWork _sqlConnection = sqlConnection; public MySqlConnection GetConnection() { return _sqlConnection.Connection; } public Task<MySqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default) { return Task.FromResult(_sqlConnection.Connection); } public MySqlTransaction? GetTransaction() { return _sqlConnection.Transaction; } public bool HasOpenTransaction => _sqlConnection.Transaction != null; public bool IsSharedConnection => true; } 
Enter fullscreen mode Exit fullscreen mode
b. Unit of Work

E finalmente precisamos criar uma nova interface chamada IUnitOfWork

public interface IUnitOfWork { Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable); Task CommitAsync(CancellationToken cancellationToken); Task RollbackAsync(CancellationToken cancellationToken); } 
Enter fullscreen mode Exit fullscreen mode
c. Implementação do SqlUnitOfWork
public class MySqlUnitOfWork : IUnitOfWork { public MySqlUnitOfWork(MySqlConfiguration configuration) { Connection = new(configuration.ConnectionString); Connection.Open(); } public MySqlConnection Connection { get; } public MySqlTransaction? Transaction { get; private set; } public async Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable) { if (Transaction == null) { Transaction = await Connection.BeginTransactionAsync(isolationLevel); } } public async Task CommitAsync(CancellationToken cancellationToken) { if (Transaction != null) { await Transaction.CommitAsync(cancellationToken); } } public async Task RollbackAsync(CancellationToken cancellationToken) { if (Transaction != null) { await Transaction.RollbackAsync(cancellationToken); } } public Task<MySqlCommand> CreateSqlCommandAsync(string sql, MySqlParameter[] parameters, CancellationToken cancellationToken) { var command = Connection.CreateCommand(); if (Transaction != null) { command.Transaction = Transaction; } command.CommandText = sql; if (parameters.Length > 0) { command.Parameters.AddRange(parameters); } return Task.FromResult(command); } } 
Enter fullscreen mode Exit fullscreen mode
d. Registrar Serviços na Injeção de Dependência
services .AddScoped<MySqlUnitOfWork, MySqlUnitOfWork>() .TryAddScoped<IUnitOfWork>(provider => provider.GetRequiredService<SqlUnitOfWork>()); 
Enter fullscreen mode Exit fullscreen mode

Conclusão

Ao implementar o Padrão de Caixa de Saída com Brighter e MySQL, demonstramos como alcançar consistência transacional entre atualizações de banco de dados e publicação de mensagens. Essa abordagem garante que:

  1. Mensagens são publicadas somente se a transação for confirmada com sucesso
    • Usando DepositPostAsync, mensagens como OrderPlaced e OrderPaid são armazenadas na tabela outbox_messages dentro da mesma transação dos dados de negócio. Se o manipulador falhar (ex.: erro simulado), a transação é revertida e nenhuma mensagem órfã é enviada.
    • O IMySqlTransactionConnectionProvider da Brighter garante que atualizações de banco de dados e depósitos de mensagens compartilhem a mesma transação.
  2. Tolerância a Falhas via Outbox Sweeper
    • O UseOutboxSweeper verifica periodicamente mensagens não entregues e tenta reenviá-las até que sejam reconhecidas pelo RabbitMQ. Isso desacopla a publicação de mensagens da execução do manipulador, garantindo confiabilidade mesmo que o broker esteja temporariamente indisponível.
  3. Arquitetura Desacoplada
    • Aplicações focam em transações locais, enquanto a Brighter trata a entrega de mensagens de forma assíncrona. Isso evita acoplamento com a infraestrutura de mensagens e simplifica a escalabilidade. Esta implementação mostra como a Brighter abstrai complexidades, permitindo que desenvolvedores se concentrem na lógica de negócio enquanto garantem confiabilidade em sistemas distribuídos. Para uso em produção, combine esse padrão com ferramentas de monitoramento (ex.: Prometheus), filas de mensagens mortas (DLQs) para tratar mensagens corrompidas e adicione índices na tabela de caixa de saída nas colunas Dispatched e Timestamp.

Referência

Top comments (0)