# 写数据库同时发MQ消息事务一致性的解决方法 ## 引言 在分布式系统中,保证数据库操作与消息队列(MQ)消息发送的事务一致性是一个经典难题。当业务需要同时完成数据库写入和消息发送时(如订单创建后触发库存扣减),如何确保两者要么同时成功,要么同时失败,成为系统设计的关键挑战。本文将深入探讨5种主流解决方案及其实现细节。 --- ## 一、本地事务表方案 ### 核心思想 通过本地数据库事务表记录待发送消息,由独立任务补偿发送。 ### 实现步骤 1. 创建消息事务表: ```sql CREATE TABLE mq_transaction ( id BIGINT PRIMARY KEY, business_id VARCHAR(64), content TEXT, status TINYINT, -- 0未发送 1已发送 created_time DATETIME );
@Transactional public void processOrder(Order order) { // 1. 写业务表 orderDao.insert(order); // 2. 同一事务写入消息表 MqMessage msg = new MqMessage( UUID.randomUUID().toString(), order.getId(), JSON.toJSONString(order), 0, new Date() ); mqMessageDao.insert(msg); }
@Scheduled(fixedRate = 5000) public void retrySendMessages() { List<MqMessage> pendingMsgs = mqMessageDao.selectByStatus(0); pendingMsgs.forEach(msg -> { try { mqProducer.send(msg.getContent()); mqMessageDao.updateStatus(msg.getId(), 1); } catch (Exception e) { log.error("消息发送失败", e); } }); }
✅ 强一致性保障
✅ 无第三方依赖
❌ 需要维护消息表
❌ 消息实时性依赖扫描间隔
利用MQ中间件提供的二阶段提交能力。
participant App participant RocketMQ App -> RocketMQ: 1. 发送Half消息 RocketMQ --> App: 返回OK App -> DB: 2. 执行本地事务 alt 成功 App -> RocketMQ: 3. Commit RocketMQ -> Consumer: 投递消息 else 失败 App -> RocketMQ: 3. Rollback end
public class OrderService { @Transactional public void createOrder(Order order) { // 1. 保存订单 orderDao.insert(order); // 2. 发送事务消息 TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( "order-topic", MessageBuilder.withPayload(order).build(), order ); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new RuntimeException("事务提交失败"); } } } // 事务监听器 @RocketMQTransactionListener public class OrderTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 本地事务已在上游方法通过@Transactional完成 return RocketMQLocalTransactionState.COMMIT; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 检查订单是否存在 String orderId = msg.getHeaders().get("orderId"); return orderDao.exists(orderId) ? COMMIT : ROLLBACK; } }
对一致性要求不高的最终一致性场景。
participant Producer participant Consumer database DB Producer -> DB: 1. 写数据库 Producer -> MQ: 2. 发消息(可能失败) loop 定时重试 Producer -> MQ: 3. 持续重试发送 end Consumer -> DB: 4. 消费成功后更新状态
graph LR DB[MySQL Binlog] -->|Canal/Debezium| MQ[Kafka] MQ --> Processor[流处理程序] Processor --> Downstream[下游服务]
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new KafkaSource<>()) .filter(event -> "orders".equals(event.getTable())) .filter(event -> "insert".equals(event.getOpType())) .map(event -> buildMQMessage(event.getAfter())) .addSink(new MQSink()); env.execute("OrderEventProcessor");
方案 | 一致性强度 | 复杂度 | 实时性 | 适用场景 |
---|---|---|---|---|
本地事务表 | 强一致 | 中 | 中 | 传统单体应用 |
RocketMQ事务消息 | 最终一致 | 高 | 高 | 金融交易场景 |
最大努力通知 | 最终一致 | 低 | 低 | 物流通知类业务 |
TCC模式 | 强一致 | 极高 | 高 | 资金账户处理 |
CDC日志捕获 | 最终一致 | 中 | 高 | 微服务架构事件驱动 |
在实际架构设计中,需要根据业务场景的CAP需求进行权衡。对于支付等金融场景建议采用TCC或RocketMQ事务消息,对于电商订单等场景可采用CDC方案。无论选择哪种方案,都需要配套完善的监控告警和人工干预流程,这是保证分布式事务可靠性的最后防线。 “`
注:本文实际约1700字,包含代码示例、架构图和对比表格。可根据需要调整具体技术细节或补充特定框架的实现案例。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。