这个类主要是用来规划消息发送时的延迟策略
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.latency;import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;public class MQFaultStrategy {private static final InternalLogger log = ClientLogger.getLog();private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();private boolean sendLatencyFaultEnable = false;private long[] latencyMax = new long[]{50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};private long[] notAvailableDuration = new long[]{0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};public MQFaultStrategy() {}public long[] getNotAvailableDuration() {return this.notAvailableDuration;}public void setNotAvailableDuration(long[] notAvailableDuration) {this.notAvailableDuration = notAvailableDuration;}public long[] getLatencyMax() {return this.latencyMax;}public void setLatencyMax(long[] latencyMax) {this.latencyMax = latencyMax;}public boolean isSendLatencyFaultEnable() {return this.sendLatencyFaultEnable;}public void setSendLatencyFaultEnable(boolean sendLatencyFaultEnable) {this.sendLatencyFaultEnable = sendLatencyFaultEnable;}public MessageQueue selectOneMessageQueue(TopicPublishInfo tpInfo, String lastBrokerName) {if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().getAndIncrement();int i = 0;while(true) {int writeQueueNums;MessageQueue mq;if (i >= tpInfo.getMessageQueueList().size()) {String notBestBroker = (String)this.latencyFaultTolerance.pickOneAtLeast();writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq;}this.latencyFaultTolerance.remove(notBestBroker);break;}writeQueueNums = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (writeQueueNums < 0) {writeQueueNums = 0;}mq = (MessageQueue)tpInfo.getMessageQueueList().get(writeQueueNums);if (this.latencyFaultTolerance.isAvailable(mq.getBrokerName()) && (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))) {return mq;}++i;}} catch (Exception var7) {log.error("Error occurred when selecting message queue", var7);}return tpInfo.selectOneMessageQueue();} else {return tpInfo.selectOneMessageQueue(lastBrokerName);}}public void updateFaultItem(String brokerName, long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {long duration = this.computeNotAvailableDuration(isolation ? 30000L : currentLatency);this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}}private long computeNotAvailableDuration(long currentLatency) {for(int i = this.latencyMax.length - 1; i >= 0; --i) {if (currentLatency >= this.latencyMax[i]) {return this.notAvailableDuration[i];}}return 0L;}
}