温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

RocketMQ的事务消息是什么意思

发布时间:2021-06-23 14:26:44 来源:亿速云 阅读:246 作者:chen 栏目:大数据

这篇文章主要介绍“RocketMQ的事务消息是什么意思”,在日常操作中,相信很多人在RocketMQ的事务消息是什么意思问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ的事务消息是什么意思”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

一、 背景

阿里的RocketMQ以前版本阉割的消息回查,在新版又重新加入了,解决小公司没能力做可靠消息中间件产品。同时RocketMQ也参考了Kafka实现,性能上也很不错。

二、 版本

<dependency>     <groupId>org.apache.rocketmq</groupId>     <artifactId>rocketmq-spring-boot-starter</artifactId>     <version>2.0.3</version> </dependency>

三、源码解读

官方demo

@SpringBootApplication public class ProducerApplication implements CommandLineRunner {     private static final String TX_PGROUP_NAME = "myTxProducerGroup";     @Resource     private RocketMQTemplate rocketMQTemplate;     @Value("${demo.rocketmq.transTopic}")     private String springTransTopic;     public static void main(String[] args) {         SpringApplication.run(ProducerApplication.class, args);     }     @Override     public void run(String... args) throws Exception {         // Send transactional messages         testTransaction();     }          private void testTransaction() throws MessagingException {         String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};         for (int i = 0; i < 10; i++) {             try {                 Message msg = MessageBuilder                                         .withPayload("Hello RocketMQ " + i)                                         .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i)                                         .build();                 SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,                                                                         springTransTopic + ":" + tags[i % tags.length],                                                                         msg,                                                                         null);                 System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",                                     msg.getPayload(),                                     sendResult.getSendStatus());                 Thread.sleep(10);             } catch (Exception e) {                 e.printStackTrace();             }         }     }     @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)     class TransactionListenerImpl implements RocketMQLocalTransactionListener {         private AtomicInteger transactionIndex = new AtomicInteger(0);         private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();         @Override         public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {             String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);             System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId);             int value = transactionIndex.getAndIncrement();             int status = value % 3;             localTrans.put(transId, status);             if (status == 0) {                 // Return local transaction with success(commit), in this case,                 // this message will not be checked in checkLocalTransaction()                 System.out.printf("    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());                 return RocketMQLocalTransactionState.COMMIT;             }             if (status == 1) {                 // Return local transaction with failure(rollback) , in this case,                 // this message will not be checked in checkLocalTransaction()                 System.out.printf("    # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());                 return RocketMQLocalTransactionState.ROLLBACK;             }             System.out.printf("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");             return RocketMQLocalTransactionState.UNKNOWN;         }         @Override         public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {             String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);             RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;             Integer status = localTrans.get(transId);             if (null != status) {                 switch (status) {                     case 0:                         retState = RocketMQLocalTransactionState.UNKNOWN;                         break;                     case 1:                         retState = RocketMQLocalTransactionState.COMMIT;                         break;                     case 2:                         retState = RocketMQLocalTransactionState.ROLLBACK;                         break;                 }             }             System.out.printf("------ !!! checkLocalTransaction is executed once," +                     " msgTransactionId=%s, TransactionState=%s status=%s %n",                 transId, retState, status);             return retState;         }     } }

事务消息调用的是RocketMQTemplate.sendMessageInTransaction(),那么就从这里开始

//RocketMQTemplate public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException {     try {         //从本地缓存中获取生产者组名的生产者         TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);         org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,                                                                                                    charset, destination, message);         return txProducer.sendMessageInTransaction(rocketMsg, arg);     } catch (MQClientException e) {         throw RocketMQUtil.convert(e);     } }

进入txProducer.sendMessageInTransaction(rocketMsg, arg)

//TransactionMQProducer public TransactionSendResult sendMessageInTransaction(final Message msg,         final Object arg) throws MQClientException {     //是否已经设置事务监听器(本地事务、回调查询)     if (null == this.transactionListener) {         throw new MQClientException("TransactionListener is null", null);     }     return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); }

进入defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg)

//DefaultMQProducerImpl public TransactionSendResult sendMessageInTransaction(final Message msg,                                                           final LocalTransactionExecuter localTransactionExecuter, final Object arg)         throws MQClientException {     TransactionListener transactionListener = getCheckListener();     if (null == localTransactionExecuter && null == transactionListener) {         throw new MQClientException("tranExecutor is null", null);     }     Validators.checkMessage(msg, this.defaultMQProducer);     SendResult sendResult = null;	//设置为预消息     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());     try {         //发送消息         sendResult = this.send(msg);     } catch (Exception e) {         throw new MQClientException("send message Exception", e);     }     LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;     Throwable localException = null;     switch (sendResult.getSendStatus()) {         //发送成功,当消息对象中的isWaitStoreMsgOK=true(默认true),如果 isWaitStoreMsgOK=false,当没有捕获到异常,那么将返回SEND_OK         case SEND_OK: {             try {                 if (sendResult.getTransactionId() != null) {                     msg.putUserProperty("__transactionId__", sendResult.getTransactionId());                 }                 String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                 if (null != transactionId && !"".equals(transactionId)) {                     msg.setTransactionId(transactionId);                 }                 //执行传入的本地分支事务                 if (null != localTransactionExecuter) {                     localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);                 //执行注解或者生产者构造传入的事务监听器                 } else if (transactionListener != null) {                     log.debug("Used new transaction API");                     localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                 }                 if (null == localTransactionState) {                     localTransactionState = LocalTransactionState.UNKNOW;                 }                 if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {                     log.info("executeLocalTransactionBranch return {}", localTransactionState);                     log.info(msg.toString());                 }             } catch (Throwable e) {                 log.info("executeLocalTransactionBranch exception", e);                 log.info(msg.toString());                 localException = e;             }         }             break;         //刷盘超时         case FLUSH_DISK_TIMEOUT:         //数据同步到Slave服务器器超时         case FLUSH_SLAVE_TIMEOUT:         //无Slave服务器器可用         case SLAVE_NOT_AVAILABLE:             localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;             break;         default:             break;     }     try {         //发送二次确认消息         this.endTransaction(sendResult, localTransactionState, localException);     } catch (Exception e) {         log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);     }     //封装执行结果     TransactionSendResult transactionSendResult = new TransactionSendResult();     transactionSendResult.setSendStatus(sendResult.getSendStatus());     transactionSendResult.setMessageQueue(sendResult.getMessageQueue());     transactionSendResult.setMsgId(sendResult.getMsgId());     transactionSendResult.setQueueOffset(sendResult.getQueueOffset());     transactionSendResult.setTransactionId(sendResult.getTransactionId());     transactionSendResult.setLocalTransactionState(localTransactionState);     return transactionSendResult; } public void endTransaction(         final SendResult sendResult,         final LocalTransactionState localTransactionState,         final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {          final MessageId id;     if (sendResult.getOffsetMsgId() != null) {         id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());     } else {         id = MessageDecoder.decodeMessageId(sendResult.getMsgId());     }     String transactionId = sendResult.getTransactionId();     final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());     EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();     requestHeader.setTransactionId(transactionId);     requestHeader.setCommitLogOffset(id.getOffset());     switch (localTransactionState) {         //提交事务         case COMMIT_MESSAGE:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);             break;         //提交事务         case ROLLBACK_MESSAGE:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);             break;         //提交事务         case UNKNOW:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);             break;         default:             break;     }     requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());     requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());     requestHeader.setMsgId(sendResult.getMsgId());     String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;          //单向发送二次确认消息,不需要服务端相应,由消息回查监听补偿     this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,                                                                    this.defaultMQProducer.getSendMsgTimeout()); }

刚开始看RocketMQ的事务消息Example时,用的监听器执行本地事务,还以为是通过向服务端发送预消息,异步监听服务端响应再处理本地事务,那客户端根本没法实时响应。

到此,关于“RocketMQ的事务消息是什么意思”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI