温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何整合RocketMQ事务消息

发布时间:2021-06-18 15:05:02 来源:亿速云 阅读:240 作者:Leah 栏目:大数据

今天就跟大家聊聊有关如何整合RocketMQ事务消息,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

一、 选择RocketMQ原因

ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ选型

二、 整合思路

RocketMQ提供了事务消息回查,查看官方Demo

@SpringBootApplication public class ProducerApplication implements CommandLineRunner {     private static final String TX_PGROUP_NAME = "myTxProducerGroup";     @Resource     private RocketMQTemplate rocketMQTemplate;     @Value("${demo.rocketmq.transTopic}")     private String springTransTopic;          public static void main(String[] args) {         SpringApplication.run(ProducerApplication.class, args);     }     @Override     public void run(String... args) throws Exception {         // Send transactional messages         testTransaction();     }     private void testTransaction() throws MessagingException {         String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};         for (int i = 0; i < 10; i++) {             try {                 Message msg = MessageBuilder                                         .withPayload("Hello RocketMQ " + i)                                         .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i)                                         .build();                 SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,                                                                         springTransTopic + ":" + tags[i % tags.length],                                                                         msg,                                                                         null);                 System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",                                     msg.getPayload(),                                     sendResult.getSendStatus());                 Thread.sleep(10);             } catch (Exception e) {                 e.printStackTrace();             }         }     }     @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)     class TransactionListenerImpl implements RocketMQLocalTransactionListener {         private AtomicInteger transactionIndex = new AtomicInteger(0);         private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();         @Override         public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {             String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);             System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId);             int value = transactionIndex.getAndIncrement();             int status = value % 3;             localTrans.put(transId, status);             if (status == 0) {                 // Return local transaction with success(commit), in this case,                 // this message will not be checked in checkLocalTransaction()                 System.out.printf("    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());                 return RocketMQLocalTransactionState.COMMIT;             }             if (status == 1) {                 // Return local transaction with failure(rollback) , in this case,                 // this message will not be checked in checkLocalTransaction()                 System.out.printf("    # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());                 return RocketMQLocalTransactionState.ROLLBACK;             }             System.out.printf("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");             return RocketMQLocalTransactionState.UNKNOWN;         }         @Override         public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {             String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);             RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;             Integer status = localTrans.get(transId);             if (null != status) {                 switch (status) {                     case 0:                         retState = RocketMQLocalTransactionState.UNKNOWN;                         break;                     case 1:                         retState = RocketMQLocalTransactionState.COMMIT;                         break;                     case 2:                         retState = RocketMQLocalTransactionState.ROLLBACK;                         break;                 }             }             System.out.printf("------ !!! checkLocalTransaction is executed once," +                     " msgTransactionId=%s, TransactionState=%s status=%s %n",                 transId, retState, status);             return retState;         }     } }

需要在testTransaction()中发送消息,然后在TransactionListenerImpl类中实现executeLocalTransaction()方法才能执行整个本地事务,然后在checkLocalTransaction()中实现事务消息回查。

查看源代码可以知道testTransaction()方法和executeLocalTransaction()是在同一个线程当中,只不过包装RocketMQTemplate中。

三、问题和解决方法

3.1事务消息面临的几个问题:

  1. 消息发送的事务消息回调查询和本地事务没严格的先后顺序,怎么保证,回查时,事务操作肯定已经完成。

  2. 事务消息回调使用transaction_id查询,那么transaction_id存放在哪里,同时保证transaction_id关联的业务操作执行成功。

  3. 怎么把事务回调查询操作隔离出业务,保证不侵入代码中。

  4. 下游消费者怎么保证接口幂等性。

  5. 下游消费者怎么提高幂等性查询性能。

  6. 怎么把幂等性操作隔离出业务,保证不侵入代码中。

3.2 解决方法

  1. 因为数据库或者其他业务操作可能会存在延时,那么不能保证回查时业务操作已完成,那么可以多次回查,并设置最大回查次数,同时不能丢弃MQ消息持久化,方便手动恢复。

  2. 可以使用本地消息表落地的发送消息,同时可以采用切面、继承等等方式将落地消息隔离出业务代码之外,保证本地消息落库不侵入,注意必须要保证本地消息落库和本地业务落库在同一个事务之内!

  3. 事务消息回查可以使用第2点的本地消息表,根据transaction_id查询,判断本地事务的执行结果,也和第2点一样,可以使用一些方式将事务消息回查代码隔离出业务代码,保证不侵入。

  4. 幂等性的方法:

    • 数据库唯一约束

    • 状态机CAS单向流转

    • 消息去重表

  5. ,在执行本地业务前,先对redis判断是业务id是否存在,存在则直接返回消费成功,在执行本地业务之后,可以将消费信息异步落地到redis当中。注意:需要保证本地业务和消息幂等性操作在同一个事务当中,同时redis落地操作在事务之外。

  6. 比较好的方案应该是数据库唯一约束 + 消息去重表,在消息去重表中对业务id设置唯一约束,同时将消息落地操作隔离出本地业务之外,保证不侵入。

  7. 定时清理历史的本地消息表(消息去重表)。

看完上述内容,你们对如何整合RocketMQ事务消息有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI