温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

rocketmq中retryAnotherBrokerWhenNotStoreOK的原理和应用

发布时间:2021-06-25 10:22:03 来源:亿速云 阅读:444 作者:chen 栏目:大数据

这篇文章主要讲解了“rocketmq中retryAnotherBrokerWhenNotStoreOK的原理和应用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“rocketmq中retryAnotherBrokerWhenNotStoreOK的原理和应用”吧!

本文主要研究一下rocketmq的retryAnotherBrokerWhenNotStoreOK

DefaultMQProducer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {     private final InternalLogger log = ClientLogger.getLog();     //......     /**      * Indicate whether to retry another broker on sending failure internally.      */     private boolean retryAnotherBrokerWhenNotStoreOK = false;     public boolean isRetryAnotherBrokerWhenNotStoreOK() {         return retryAnotherBrokerWhenNotStoreOK;     }     public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {         this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;     }     //...... }
  • DefaultMQProducer有个retryAnotherBrokerWhenNotStoreOK属性,默认为false

DefaultMQProducerImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {     private final InternalLogger log = ClientLogger.getLog();     private final Random random = new Random();     private final DefaultMQProducer defaultMQProducer;     private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =         new ConcurrentHashMap<String, TopicPublishInfo>();     private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();     private final RPCHook rpcHook;     protected BlockingQueue<Runnable> checkRequestQueue;     protected ExecutorService checkExecutor;     private ServiceState serviceState = ServiceState.CREATE_JUST;     private MQClientInstance mQClientFactory;     private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();     private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));     private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();     private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;     private final ExecutorService defaultAsyncSenderExecutor;     private ExecutorService asyncSenderExecutor;     //......     private SendResult sendDefaultImpl(         Message msg,         final CommunicationMode communicationMode,         final SendCallback sendCallback,         final long timeout     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {         this.makeSureStateOK();         Validators.checkMessage(msg, this.defaultMQProducer);         final long invokeID = random.nextLong();         long beginTimestampFirst = System.currentTimeMillis();         long beginTimestampPrev = beginTimestampFirst;         long endTimestamp = beginTimestampFirst;         TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());         if (topicPublishInfo != null && topicPublishInfo.ok()) {             boolean callTimeout = false;             MessageQueue mq = null;             Exception exception = null;             SendResult sendResult = null;             int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;             int times = 0;             String[] brokersSent = new String[timesTotal];             for (; times < timesTotal; times++) {                 String lastBrokerName = null == mq ? null : mq.getBrokerName();                 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);                 if (mqSelected != null) {                     mq = mqSelected;                     brokersSent[times] = mq.getBrokerName();                     try {                         beginTimestampPrev = System.currentTimeMillis();                         if (times > 0) {                             //Reset topic with namespace during resend.                             msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));                         }                         long costTime = beginTimestampPrev - beginTimestampFirst;                         if (timeout < costTime) {                             callTimeout = true;                             break;                         }                         sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                         endTimestamp = System.currentTimeMillis();                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                         switch (communicationMode) {                             case ASYNC:                                 return null;                             case ONEWAY:                                 return null;                             case SYNC:                                 if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                     if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                         continue;                                     }                                 }                                 return sendResult;                             default:                                 break;                         }                     } catch (RemotingException e) {                         endTimestamp = System.currentTimeMillis();                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                         log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                         log.warn(msg.toString());                         exception = e;                         continue;                     } catch (MQClientException e) {                         endTimestamp = System.currentTimeMillis();                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                         log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                         log.warn(msg.toString());                         exception = e;                         continue;                     } catch (MQBrokerException e) {                         endTimestamp = System.currentTimeMillis();                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                         log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                         log.warn(msg.toString());                         exception = e;                         switch (e.getResponseCode()) {                             case ResponseCode.TOPIC_NOT_EXIST:                             case ResponseCode.SERVICE_NOT_AVAILABLE:                             case ResponseCode.SYSTEM_ERROR:                             case ResponseCode.NO_PERMISSION:                             case ResponseCode.NO_BUYER_ID:                             case ResponseCode.NOT_IN_CURRENT_UNIT:                                 continue;                             default:                                 if (sendResult != null) {                                     return sendResult;                                 }                                 throw e;                         }                     } catch (InterruptedException e) {                         endTimestamp = System.currentTimeMillis();                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                         log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                         log.warn(msg.toString());                         log.warn("sendKernelImpl exception", e);                         log.warn(msg.toString());                         throw e;                     }                 } else {                     break;                 }             }             if (sendResult != null) {                 return sendResult;             }             String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",                 times,                 System.currentTimeMillis() - beginTimestampFirst,                 msg.getTopic(),                 Arrays.toString(brokersSent));             info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);             MQClientException mqClientException = new MQClientException(info, exception);             if (callTimeout) {                 throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");             }             if (exception instanceof MQBrokerException) {                 mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());             } else if (exception instanceof RemotingConnectException) {                 mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);             } else if (exception instanceof RemotingTimeoutException) {                 mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);             } else if (exception instanceof MQClientException) {                 mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);             }             throw mqClientException;         }         List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();         if (null == nsList || nsList.isEmpty()) {             throw new MQClientException(                 "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);         }         throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),             null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);     }     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {         return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);     }     //...... }
  • DefaultMQProducerImpl的sendDefaultImpl方法在communicationMode为SYNC时会判断sendResult.getSendStatus()是否是SendStatus.SEND_OK,不是的话,再判断defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK(),如果是则执行continue,否则直接返回sendResult;for循环里头维护了lastBrokerName,每次执行selectOneMessageQueue(topicPublishInfo, lastBrokerName)的时候会传递过去;selectOneMessageQueue方法执行的是mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName)方法

MQFaultStrategy

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/latency/MQFaultStrategy.java

public class MQFaultStrategy {     private final static InternalLogger log = ClientLogger.getLog();     private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();     private boolean sendLatencyFaultEnable = false;     private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};     private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};     public long[] getNotAvailableDuration() {         return notAvailableDuration;     }     public void setNotAvailableDuration(final long[] notAvailableDuration) {         this.notAvailableDuration = notAvailableDuration;     }     public long[] getLatencyMax() {         return latencyMax;     }     public void setLatencyMax(final long[] latencyMax) {         this.latencyMax = latencyMax;     }     public boolean isSendLatencyFaultEnable() {         return sendLatencyFaultEnable;     }     public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {         this.sendLatencyFaultEnable = sendLatencyFaultEnable;     }     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {         if (this.sendLatencyFaultEnable) {             try {                 int index = tpInfo.getSendWhichQueue().getAndIncrement();                 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                     int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                     if (pos < 0)                         pos = 0;                     MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                     if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                         if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                             return mq;                     }                 }                 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();                 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);                 if (writeQueueNums > 0) {                     final MessageQueue mq = tpInfo.selectOneMessageQueue();                     if (notBestBroker != null) {                         mq.setBrokerName(notBestBroker);                         mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                     }                     return mq;                 } else {                     latencyFaultTolerance.remove(notBestBroker);                 }             } catch (Exception e) {                 log.error("Error occurred when selecting message queue", e);             }             return tpInfo.selectOneMessageQueue();         }         return tpInfo.selectOneMessageQueue(lastBrokerName);     }     public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {         if (this.sendLatencyFaultEnable) {             long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);             this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);         }     }     private long computeNotAvailableDuration(final long currentLatency) {         for (int i = latencyMax.length - 1; i >= 0; i--) {             if (currentLatency >= latencyMax[i])                 return this.notAvailableDuration[i];         }         return 0;     } }
  • MQFaultStrategy的selectOneMessageQueue方法首先判断是否开启sendLatencyFaultEnable,默认为false,直接走tpInfo.selectOneMessageQueue(lastBrokerName)

TopicPublishInfo

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java

public class TopicPublishInfo {     private boolean orderTopic = false;     private boolean haveTopicRouterInfo = false;     private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();     private TopicRouteData topicRouteData;     //......     public MessageQueue selectOneMessageQueue(final String lastBrokerName) {         if (lastBrokerName == null) {             return selectOneMessageQueue();         } else {             int index = this.sendWhichQueue.getAndIncrement();             for (int i = 0; i < this.messageQueueList.size(); i++) {                 int pos = Math.abs(index++) % this.messageQueueList.size();                 if (pos < 0)                     pos = 0;                 MessageQueue mq = this.messageQueueList.get(pos);                 if (!mq.getBrokerName().equals(lastBrokerName)) {                     return mq;                 }             }             return selectOneMessageQueue();         }     }     public MessageQueue selectOneMessageQueue() {         int index = this.sendWhichQueue.getAndIncrement();         int pos = Math.abs(index) % this.messageQueueList.size();         if (pos < 0)             pos = 0;         return this.messageQueueList.get(pos);     }     //...... }
  • TopicPublishInfo的selectOneMessageQueue在lastBrokerName为null的时候执行selectOneMessageQueue,采取的轮询的方式选择MessageQueue;lastBrokerName不为null的时候,最多循环messageQueueList.size()次,选出一个brokerName不为lastBrokerName的MessageQueue;如果都没有选到最后通过无参的selectOneMessageQueue来选择

小结

DefaultMQProducerImpl的sendDefaultImpl方法在communicationMode为SYNC时会判断sendResult.getSendStatus()是否是SendStatus.SEND_OK,不是的话,再判断defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK(),如果是则执行continue,否则直接返回sendResult;for循环里头维护了lastBrokerName,每次执行selectOneMessageQueue(topicPublishInfo, lastBrokerName)的时候会传递过去;selectOneMessageQueue方法执行的是mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName)方法

感谢各位的阅读,以上就是“rocketmq中retryAnotherBrokerWhenNotStoreOK的原理和应用”的内容了,经过本文的学习后,相信大家对rocketmq中retryAnotherBrokerWhenNotStoreOK的原理和应用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI