Introdução
Este guia demonstra como implementar o Padrão de Caixa de Saída (Outbox Pattern) com MySQL e .NET 8 usando a biblioteca Brighter para garantir consistência transacional entre atualizações de banco de dados e publicação de mensagens.
Projeto
O objetivo é processar um comando CreateNewOrder
que publique dois eventos (OrderPlaced
, OrderPaid
) somente se a transação for bem-sucedida. Se ocorrer um erro (ex.: violação de regra de negócio), ambas as alterações no banco de dados e publicações de mensagens serão revertidas.
Requisitos
- .NET 8+
- Podman (ou Docker) para executar contêineres locais:
- MySql
- RabbitMQ
- Conhecimento sobre RabbitMQ na Brighter
- Pacotes Nuget
Mensagens
Para este projeto, usaremos estas 3 mensagens: CreateNewOrder
, OrderPlaced
e OrderPaid
public class CreateNewOrder() : Command(Guid.NewGuid()) { public decimal Value { get; set; } } public class OrderPlaced() : Event(Guid.NewGuid()) { public string OrderId { get; set; } = string.Empty; public decimal Value { get; set; } } public class OrderPaid() : Event(Guid.NewGuid()) { public string OrderId { get; set; } = string.Empty; }
Mapeadores de Mensagens
Como apenas os eventos OrderPlaced
e OrderPaid
são publicados no RabbitMQ, precisamos implementar mapeadores para eles usando serialização JSON
public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced> { public Message MapToMessage(OrderPlaced request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "order-placed"; header.MessageType = MessageType.MT_EVENT; var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); } public OrderPlaced MapToRequest(Message message) { return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!; } } public class OrderPaidMapper : IAmAMessageMapper<OrderPaid> { public Message MapToMessage(OrderPaid request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "order-paid"; header.MessageType = MessageType.MT_EVENT; var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); } public OrderPaid MapToRequest(Message message) { return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!; } }
Manipuladores de Requisições
Para OrderPlaced
e OrderPaid
vamos registrar logs da mensagem recebida.
public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced> { public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default) { logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value); return base.HandleAsync(command, cancellationToken); } } public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid> { public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default) { logger.LogInformation("{OrderId} paid", command.OrderId); return base.HandleAsync(command, cancellationToken); } }
Criar Novo Pedido
O manipulador CreateNewOrder
vai esperar 10ms para emular um processo, depois publica o OrderPlaced
, se o valor for divisível por 3 lança uma exceção (simulando um erro de negócio), caso contrário publica OrderPaid
.
public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor, IUnitOfWork unitOfWork, ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder> { public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default) { await unitOfWork.BeginTransactionAsync(cancellationToken); try { string id = Guid.NewGuid().ToString(); logger.LogInformation("Creating a new order: {OrderId}", id); await Task.Delay(10, cancellationToken); // emulando um processo _ = await commandProcessor.DepositPostAsync(new OrderPlaced { OrderId = id, Value = command.Value }, cancellationToken: cancellationToken); if (command.Value % 3 == 0) { throw new InvalidOperationException("invalid value"); } _ = await commandProcessor.DepositPostAsync(new OrderPaid { OrderId = id }, cancellationToken: cancellationToken); await unitOfWork.CommitAsync(cancellationToken); return await base.HandleAsync(command, cancellationToken); } catch { logger.LogError("Invalid data"); await unitOfWork.RollbackAsync(cancellationToken); throw; } } }
Entendimento Chave:
DepositPostAsync
armazena mensagens na caixa de saída dentro da mesma transação dos dados de negócio.Se ocorrer uma exceção (ex.:
InvalidOperationException
), a transação é revertida, garantindo que não haja mensagens órfãs.
Configurando MySQL
Para integrar o Padrão de Caixa de Saída com MySQL, primeiro garanta que a tabela outbox_messages
exista.
1. Esquema da Tabela SQL
CREATE TABLE IF NOT EXISTS `outbox_messages`( `MessageId` CHAR(36) NOT NULL, `Topic` VARCHAR(255) NOT NULL, `MessageType` VARCHAR(32) NOT NULL, `Timestamp` TIMESTAMP(3) NOT NULL, `CorrelationId` CHAR(36) NULL, `ReplyTo` VARCHAR(255) NULL, `ContentType` VARCHAR(128) NULL, `Dispatched` TIMESTAMP(3) NULL, `HeaderBag` TEXT NOT NULL, `Body` TEXT NOT NULL, `Created` TIMESTAMP(3) NOT NULL DEFAULT NOW(3), `CreatedID` INT(11) NOT NULL AUTO_INCREMENT, UNIQUE(`CreatedID`), PRIMARY KEY (`MessageId`) );
2. Configuração de Injeção de Dependência
Registre a caixa de saída e transação.
services .AddServiceActivator(opt => { /* Configuração de assinatura (ver artigo anterior) */ }) .UseMySqlOutbox(new MySqlConfiguration(ConnectionString, "outbox_messages"), typeof(MySqlConnectionProvider), ServiceLifetime.Scoped)) .UseMySqTransactionConnectionProvider(typeof(MySqlConnectionProvider)) .UseOutboxSweeper(opt => opt.BatchSize = 10);
Por Que Funciona:
-
UseMySqlOutbox
vincula a caixa de saída ao SQL Server. -
UseOutboxSweeper
configura a verificação em segundo plano para mensagens não entregues.
3. Gestão de Transações
Para garantir atomicidade entre lógica de negócio e publicação de mensagens na Brighter, implemente IMySqlTransactionConnectionProvider
e IUnitOfWork
para compartilhar o contexto de transação. Isso garante que mensagens só sejam armazenadas na caixa de saída se a transação do banco de dados for confirmada com sucesso.
a. SqlConnectionProvider
public class MySqlConnectionProvider(MySqlUnitOfWork sqlConnection) : IMySqlTransactionConnectionProvider { private readonly MySqlUnitOfWork _sqlConnection = sqlConnection; public MySqlConnection GetConnection() { return _sqlConnection.Connection; } public Task<MySqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default) { return Task.FromResult(_sqlConnection.Connection); } public MySqlTransaction? GetTransaction() { return _sqlConnection.Transaction; } public bool HasOpenTransaction => _sqlConnection.Transaction != null; public bool IsSharedConnection => true; }
b. Unit of Work
E finalmente precisamos criar uma nova interface chamada IUnitOfWork
public interface IUnitOfWork { Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable); Task CommitAsync(CancellationToken cancellationToken); Task RollbackAsync(CancellationToken cancellationToken); }
c. Implementação do SqlUnitOfWork
public class MySqlUnitOfWork : IUnitOfWork { public MySqlUnitOfWork(MySqlConfiguration configuration) { Connection = new(configuration.ConnectionString); Connection.Open(); } public MySqlConnection Connection { get; } public MySqlTransaction? Transaction { get; private set; } public async Task BeginTransactionAsync(CancellationToken cancellationToken, IsolationLevel isolationLevel = IsolationLevel.Serializable) { if (Transaction == null) { Transaction = await Connection.BeginTransactionAsync(isolationLevel); } } public async Task CommitAsync(CancellationToken cancellationToken) { if (Transaction != null) { await Transaction.CommitAsync(cancellationToken); } } public async Task RollbackAsync(CancellationToken cancellationToken) { if (Transaction != null) { await Transaction.RollbackAsync(cancellationToken); } } public Task<MySqlCommand> CreateSqlCommandAsync(string sql, MySqlParameter[] parameters, CancellationToken cancellationToken) { var command = Connection.CreateCommand(); if (Transaction != null) { command.Transaction = Transaction; } command.CommandText = sql; if (parameters.Length > 0) { command.Parameters.AddRange(parameters); } return Task.FromResult(command); } }
d. Registrar Serviços na Injeção de Dependência
services .AddScoped<MySqlUnitOfWork, MySqlUnitOfWork>() .TryAddScoped<IUnitOfWork>(provider => provider.GetRequiredService<SqlUnitOfWork>());
Conclusão
Ao implementar o Padrão de Caixa de Saída com Brighter e MySQL, demonstramos como alcançar consistência transacional entre atualizações de banco de dados e publicação de mensagens. Essa abordagem garante que:
- Mensagens são publicadas somente se a transação for confirmada com sucesso
- Usando
DepositPostAsync
, mensagens comoOrderPlaced
eOrderPaid
são armazenadas na tabelaoutbox_messages
dentro da mesma transação dos dados de negócio. Se o manipulador falhar (ex.: erro simulado), a transação é revertida e nenhuma mensagem órfã é enviada. - O
IMySqlTransactionConnectionProvider
da Brighter garante que atualizações de banco de dados e depósitos de mensagens compartilhem a mesma transação.
- Usando
- Tolerância a Falhas via Outbox Sweeper
- O
UseOutboxSweeper
verifica periodicamente mensagens não entregues e tenta reenviá-las até que sejam reconhecidas pelo RabbitMQ. Isso desacopla a publicação de mensagens da execução do manipulador, garantindo confiabilidade mesmo que o broker esteja temporariamente indisponível.
- O
- Arquitetura Desacoplada
- Aplicações focam em transações locais, enquanto a Brighter trata a entrega de mensagens de forma assíncrona. Isso evita acoplamento com a infraestrutura de mensagens e simplifica a escalabilidade. Esta implementação mostra como a Brighter abstrai complexidades, permitindo que desenvolvedores se concentrem na lógica de negócio enquanto garantem confiabilidade em sistemas distribuídos. Para uso em produção, combine esse padrão com ferramentas de monitoramento (ex.: Prometheus), filas de mensagens mortas (DLQs) para tratar mensagens corrompidas e adicione índices na tabela de caixa de saída nas colunas
Dispatched
eTimestamp
.
- Aplicações focam em transações locais, enquanto a Brighter trata a entrega de mensagens de forma assíncrona. Isso evita acoplamento com a infraestrutura de mensagens e simplifica a escalabilidade. Esta implementação mostra como a Brighter abstrai complexidades, permitindo que desenvolvedores se concentrem na lógica de negócio enquanto garantem confiabilidade em sistemas distribuídos. Para uso em produção, combine esse padrão com ferramentas de monitoramento (ex.: Prometheus), filas de mensagens mortas (DLQs) para tratar mensagens corrompidas e adicione índices na tabela de caixa de saída nas colunas
Top comments (0)