温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

写数据库同时发mq消息事务一致性的解决方法

发布时间:2021-12-06 09:21:25 来源:亿速云 阅读:682 作者:柒染 栏目:大数据
# 写数据库同时发MQ消息事务一致性的解决方法 ## 引言 在分布式系统中,保证数据库操作与消息队列(MQ)消息发送的事务一致性是一个经典难题。当业务需要同时完成数据库写入和消息发送时(如订单创建后触发库存扣减),如何确保两者要么同时成功,要么同时失败,成为系统设计的关键挑战。本文将深入探讨5种主流解决方案及其实现细节。 --- ## 一、本地事务表方案 ### 核心思想 通过本地数据库事务表记录待发送消息,由独立任务补偿发送。 ### 实现步骤 1. 创建消息事务表: ```sql CREATE TABLE mq_transaction ( id BIGINT PRIMARY KEY, business_id VARCHAR(64), content TEXT, status TINYINT, -- 0未发送 1已发送 created_time DATETIME ); 
  1. 业务处理流程:
@Transactional public void processOrder(Order order) { // 1. 写业务表 orderDao.insert(order); // 2. 同一事务写入消息表 MqMessage msg = new MqMessage( UUID.randomUUID().toString(), order.getId(), JSON.toJSONString(order), 0, new Date() ); mqMessageDao.insert(msg); } 
  1. 独立定时任务扫描:
@Scheduled(fixedRate = 5000) public void retrySendMessages() { List<MqMessage> pendingMsgs = mqMessageDao.selectByStatus(0); pendingMsgs.forEach(msg -> { try { mqProducer.send(msg.getContent()); mqMessageDao.updateStatus(msg.getId(), 1); } catch (Exception e) { log.error("消息发送失败", e); } }); } 

优缺点分析

✅ 强一致性保障
✅ 无第三方依赖
❌ 需要维护消息表
❌ 消息实时性依赖扫描间隔


二、事务消息方案(RocketMQ)

实现机制

利用MQ中间件提供的二阶段提交能力。

典型流程

participant App participant RocketMQ App -> RocketMQ: 1. 发送Half消息 RocketMQ --> App: 返回OK App -> DB: 2. 执行本地事务 alt 成功 App -> RocketMQ: 3. Commit RocketMQ -> Consumer: 投递消息 else 失败 App -> RocketMQ: 3. Rollback end 

代码示例

public class OrderService { @Transactional public void createOrder(Order order) { // 1. 保存订单 orderDao.insert(order); // 2. 发送事务消息 TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( "order-topic", MessageBuilder.withPayload(order).build(), order ); if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) { throw new RuntimeException("事务提交失败"); } } } // 事务监听器 @RocketMQTransactionListener public class OrderTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 本地事务已在上游方法通过@Transactional完成 return RocketMQLocalTransactionState.COMMIT; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 检查订单是否存在 String orderId = msg.getHeaders().get("orderId"); return orderDao.exists(orderId) ? COMMIT : ROLLBACK; } } 

注意事项

  • 需要MQ支持事务消息(如RocketMQ 4.3+)
  • 消息回查机制可能导致重复消费

三、最大努力通知方案

适用场景

对一致性要求不高的最终一致性场景。

实现模式

participant Producer participant Consumer database DB Producer -> DB: 1. 写数据库 Producer -> MQ: 2. 发消息(可能失败) loop 定时重试 Producer -> MQ: 3. 持续重试发送 end Consumer -> DB: 4. 消费成功后更新状态 

关键设计

  1. 消息表增加重试次数字段
  2. 指数退避重试策略
  3. 人工干预兜底机制

四、TCC模式解决方案

三阶段划分

  1. Try:预留资源
    • 冻结订单状态为”处理中”
    • 预扣减库存
  2. Confirm:确认执行
    • 更新订单为”已完成”
    • 发送确认消息
  3. Cancel:取消释放
    • 订单状态回滚
    • 库存预扣减回退

异常处理要点

  • 需实现幂等性控制
  • 需要记录事务日志
  • 必须配备定时任务补偿

五、CDC日志捕获方案

技术架构

graph LR DB[MySQL Binlog] -->|Canal/Debezium| MQ[Kafka] MQ --> Processor[流处理程序] Processor --> Downstream[下游服务] 

实现示例(Flink处理)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new KafkaSource<>()) .filter(event -> "orders".equals(event.getTable())) .filter(event -> "insert".equals(event.getOpType())) .map(event -> buildMQMessage(event.getAfter())) .addSink(new MQSink()); env.execute("OrderEventProcessor"); 

优势比较

  • 完全解耦业务代码
  • 支持多消费者复用
  • 延迟通常在毫秒级

方案选型对比

方案 一致性强度 复杂度 实时性 适用场景
本地事务表 强一致 传统单体应用
RocketMQ事务消息 最终一致 金融交易场景
最大努力通知 最终一致 物流通知类业务
TCC模式 强一致 极高 资金账户处理
CDC日志捕获 最终一致 微服务架构事件驱动

结语

在实际架构设计中,需要根据业务场景的CAP需求进行权衡。对于支付等金融场景建议采用TCC或RocketMQ事务消息,对于电商订单等场景可采用CDC方案。无论选择哪种方案,都需要配套完善的监控告警和人工干预流程,这是保证分布式事务可靠性的最后防线。 “`

注:本文实际约1700字,包含代码示例、架构图和对比表格。可根据需要调整具体技术细节或补充特定框架的实现案例。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI