RocketMQ源码阅读-Producer消息发送

RocketMQ源码阅读-Producer消息发送

  • 1. 从单元测试入手
  • 2. 启动过程
  • 3. 同步消息发送过程
  • 4. 异步消息发送过程
  • 5. 小结

Producer是消息的生产者。

Producer和Consummer对Rocket来说都是Client,Server是Broker。

客户端在源码中是一个单独的Model,目录为rocketmq/client。

类DefaultMQProducer是Producer的默认入口实现类。
image.png
继承了类ClientConfig,客户端配置类,存储上下文配置信息。
实现接口MQProducer:
image.png
定义了Producer对外提供的接口,也就是所有的发送消息的方法,同时MQProducer接口继承了MQAdmin接口。
如上图中MQAdmin是元数据管理接口,定义了对Message操作的一些方法。

DefaultMQProducer中有一个重要的成员变量:
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

DefaultMQProducer的所有操作,基本没什么业务逻辑,都是调用DefaultMQProducerImpl类中的方法。

DefaultMQProducerImpl类是Producer操作的具体实现类。

1. 从单元测试入手

看源码的流程都是从单元测试入手

Producer的单元测试在类org.apache.rocketmq.client.producer.DefaultMQProducerTest中。
单元测试的所有方法,就对应着这个类的全部功能。
DefaultMQProducerTest的方法列表如下:

其中 init 和 terminate 是测试开始初始化和测试结束销毁时需要执行的代码。
其他的方法是测试不同功能的测试用例,也就是测试不同的发消息的方式。

2. 启动过程

从单元测试中的init方法入手:

@Before
public void init() throws Exception {String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();// 创建一个Producer,并赋予名字producer = new DefaultMQProducer(producerGroupTemp);// 设置NameServer的地址producer.setNamesrvAddr("127.0.0.1:9876");// 消息长度大于16开启压缩producer.setCompressMsgBodyOverHowmuch(16);// 创建不同的Messagemessage = new Message(topic, new byte[] {'a'});zeroMsg = new Message(topic, new byte[] {});bigMessage = new Message(topic, "This is a very huge message!".getBytes());// 启动Producerproducer.start();// 设置mQClientFactory和mQClientAPIImpl,后面再讲Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");field.setAccessible(true);field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");field.setAccessible(true);field.set(mQClientFactory, mQClientAPIImpl);// 注册Producerproducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenReturn(createSendResult(SendStatus.SEND_OK));
}

这段代码就是创建了一个DefaultMQProducer,设置参数,并调用start()方法启动,之后注册到NameServer中。

首先,看下DefaultMQProducer#start()方法:

@Override
public void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));// 直接调用defaultMQProducerImpl的start方法this.defaultMQProducerImpl.start();if (null != traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}
}

其中直接调用了DefaultMQProducerImpl的start方法:

public void start() throws MQClientException {this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 获取MQClientInstance实例this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 注册boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {// 启动mQClientFactorymQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}// 给所有的broker发心跳this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

RocketMQ 使用一个成员变量 serviceState 来记录和管理自身的服务状态,这实际上是状态模式 (State Pattern) 这种设计模式的变种实现。

对于启动过程,状态serviceState为CREATE_JUST。
CREATE_JUST分支中,会获取一个MQClientManager(单例模式),通过方法getAndCreateMQClientInstance()获取一个MQClientInstance实例,赋值给成员变量:

private MQClientInstance mQClientFactory;

然后调用MQClientInstance的registerProducer()方法,将自己注册到MQClientInstance中。
随后,调用MQClientInstance的start(),启动mQClientFactory。
最后,给所有的broker发心跳。

进一步看MQClientInstance的start()方法:

public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}

这一部分代码的注释比较清楚,流程是这样的:

  1. 启动实例 mQClientAPIImpl,其中 mQClientAPIImpl 是类 MQClientAPIImpl 的实例,封装了客户端与 Broker 通信的方法;
  2. 启动各种定时任务,包括与 Broker 之间的定时心跳,定时与 NameServer 同步数据等任务;
  3. 启动拉取消息服务;
  4. 启动 Rebalance 服务;
  5. 启动默认的 Producer 服务。

以上就是 Producer 的启动流程。

3. 同步消息发送过程

分析 Producer 发送消息的流程。
接口 MQProducer 中,定义了 19 个不同参数的发消息的方法。
这19个接口可以分为3类:

  • 单向发送(Oneway):发送消息后立即返回,不处理响应,不关心是否发送成功;
  • 同步发送(Sync):发送消息后等待响应;
  • 异步发送(Async):发送消息后立即返回,在提供的回调方法中处理响应。

先看下同步发送消息的方法(异步发送消息,只是将同步发送方法提交给线程池)。
DefaultMQProducer中对同步发送方法的实现为:

@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {Validators.checkMessage(msg, this);msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg);
}

实际调用的为DefaultMQProducerImpl的send()方法:

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

最终调用的方法为DefaultMQProducerImpl的sendDefaultImpl()方法,源码位置为517行:
(源码偏长,在下方进行解读)

private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 获取Topic路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();// 选择一个消息要发送的队列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 发消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();if (null == nsList || nsList.isEmpty()) {throw new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);}throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

首先是获取Topic信息,DefaultMQProducerImpl#tryToFindTopicPublishInfo():

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {// 缓存中获取 Topic发布信息TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);// 当无可用的 Topic发布信息时,从Namesrv获取一次if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}// 若获取的 Topic发布信息时候可用,则返回if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {// 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topicthis.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}
}

优先从缓存中获取Topic路由信息,如果缓存没有,就从NameServer中获取。

然后选择消息要发送的队列,DefaultMQProducerImpl#selectOneMessageQueue():

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

实际调用MQFaultStrategy的selectOneMessageQueue方法:
根据 Topic发布信息 选择一个消息队列

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {try {// 获取 brokerName=lastBrokerName && 可用的一个消息队列int index = tpInfo.getSendWhichQueue().getAndIncrement();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}// 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}// 选择一个消息队列,不考虑队列的可用性return tpInfo.selectOneMessageQueue();}// 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性return tpInfo.selectOneMessageQueue(lastBrokerName);
}

选好队列后返回到DefaultMQProducerImpl#sendDefaultImpl()方法中。

然后DefaultMQProducerImpl#sendDefaultImpl()方法中继续调用sendKernelImpl()方法发送消息,sendKernelImpl()源码如下:

private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();// 获取 broker地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {// 是否使用broker vip通道。broker会开启两个端口对外服务。brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();// 记录消息内容。下面逻辑可能改变消息内容,例如消息压缩。try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {// 设置唯一编号MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}// 消息压缩int sysFlag = 0;boolean msgBodyCompressed = false;if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}// 事务final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}// hook:发送消息校验if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}// hook:发送消息前逻辑if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}// 构建发送消息请求SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}// 发送消息SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}// 同步发送消息sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}// hook:发送消息后逻辑if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}// 返回发送结果return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}// broker为空抛出异常throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

也很长,但是逻辑简单,主要功能就是构建发送消息的头 RequestHeader 和上下文 SendMessageContext,然后调用方法 MQClientAPIImpl#sendMessage()(上述代码第150行),将消息发送给队列所在的 Broker。

至此,消息被发送给远程调用的封装类 MQClientAPIImpl,完成后续序列化和网络传输等步骤。

这一篇主要看发送代码,后续序列化和网络传输等步骤的代码后面再去探究。

ps:推荐IEDA插件SequenceDiagram,能一键生成时序图

整体的时序图为:
同步发送消息时序图.png

4. 异步消息发送过程

上一节讲到异步发送消息,只是将同步发送方法提交给线程池。对于MQProducer接口中方法为:

void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException;

也就是带有回调方法,DefaultMQProducer中的实现为:

@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}

调用了DefaultMQProducerImpl的send方法:

@Deprecated
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {final long beginStartTime = System.currentTimeMillis();ExecutorService executor = this.getAsyncSenderExecutor();try {executor.submit(new Runnable() {@Overridepublic void run() {long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {try {sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);} catch (Exception e) {sendCallback.onException(e);}} else {sendCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}});} catch (RejectedExecutionException e) {throw new MQClientException("executor rejected ", e);}}

可以看到,实际上是将DefaultMQProducerImpl的sendDefaultImpl()方法(对应源码517行)提交到线程池执行,后面的流程都和同步发送一致,参看上一节分析。

异步发送消息时序图为:
异步发消息时序图.png
至此,异步消息发送也完成了。

5. 小结

MQProducer定义了19种发送消息的方法,其默认实现为DefaultMQProducer。DefaultMQProducer中的方法没有业务逻辑,最终会调用DefaultMQProducerImpl类中的具体逻辑。

DefaultMQProducerImpl中发消息的方法最终都会调用其sendKernelImpl()方法,其主要功能就是构建发送消息的头 RequestHeader 和上下文 SendMessageContext,然后调用方法 MQClientAPIImpl#sendMessage(),将消息发送给队列所在的 Broker。

最终消息被发送给远程调用的封装类 MQClientAPIImpl,完成后续序列化和网络传输等步骤。

类之间的关系为:
发消息类图.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/629836.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ASP.NET Core 的 Web Api 实现限流 中间件

Microsoft.AspNetCore.RateLimiting 中间件提供速率限制&#xff08;限流&#xff09;中间件。 它是.NET 7 以上版本才支持的中间件&#xff0c;刚看了一下&#xff0c;确实挺好用&#xff0c;下面给大家简单介绍一下&#xff1a; RateLimiterOptionsExtensions 类提供下列用…

收支明细曲线图:一图掌握你的财务变化趋势!

想要快速了解你的收支明细和变化趋势吗&#xff1f;不需要复杂的财务表格&#xff0c;一个曲线图就能让你一目了然&#xff01;现在&#xff0c;就让我们带你走进「图形化分析收支变化趋势」的世界&#xff0c;让你轻松掌握自己的财务状况。 首先&#xff0c;第一步&#xff0…

超结MOS在舞台灯电源上的应用-REASUNOS瑞森半导体

一、前言 舞台灯电源是一种为舞台灯具提供电力转换和控制的设备&#xff0c;它可以根据不同的灯具类型和需求&#xff0c;提供恒流或恒压、可调光或不可调光、模拟或数字或网络等输出模式。 舞台灯电源的主要特点是具有高效、稳定、安全、智能等功能&#xff0c;它可以适应不…

一台电脑如何通过另一台联网电脑访问网络

电脑A没有连接网络&#xff0c;电脑B已经连接wifi。 电脑A如何通过访问电脑B从而连接网络&#xff1f; 1. 将这2台电脑用网线直连 2. 电脑B打开【网络和Internet设置】 3. 右键点击WLAN&#xff0c;选择属性&#xff0c;进入共享tab页面&#xff0c;勾选【允许其他网络用户通过…

Kafka集群与可靠性

Kafka集群与可靠性 1.Kafka集群搭建实战 使用两台Linux服务器&#xff1a;一台192.168.182.137 一台192.168.182.138 安装kafka首先&#xff0c;我们需要配置java环境变量&#xff08;这里就略过了&#xff09; mkdir /opt/kafka #上传压缩包kafka_2.13-3.3.1.tgz并解压 ta…

Rust-Panic

什么是panic 在Rust中&#xff0c;有一类错误叫作panic。示例如下&#xff1a; 编译&#xff0c;没有错误&#xff0c;执行这段程序&#xff0c;输出为&#xff1a; 这种情况就引发了一个panic。在这段代码中&#xff0c;我们调用了Option::unwrap()方法&#xff0c;正是这个方…

SparkSQL初体验

SparkSQL初体验 命令式的 API RDD 版本的 WordCount val conf new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc new SparkContext(conf)sc.textFile("hdfs://master:9000/dataset/wordcount.txt").flatMap(_.split("…

设计一个抽奖系统

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring原理、JUC原理、Kafka原理、分布式技术原理、数据库技术&#x1f525;如果感觉博主的文章还不错的…

在uniapp Vue3版本中如何解决web/H5网页浏览器跨域的问题

问题复现 uniapp项目在浏览器运行&#xff0c;有可能调用某些接口会出现跨域问题&#xff0c;报错如下图所示&#xff1a; 什么是跨域&#xff1f; 存在跨域问题的原因是因为浏览器的同源策略&#xff0c;也就是说前端无法直接发起跨域请求。同源策略是一个基础的安全策略&a…

前端下载文件流,设置返回值类型responseType:‘blob‘无效的问题

前言&#xff1a; 本是一个非常简单的请求&#xff0c;即是下载文件。通常的做法如下&#xff1a; 1.前端通过Vue Axios向后端请求&#xff0c;同时在请求中设置响应体为Blob格式。 2.后端相应前端的请求&#xff0c;同时返回Blob格式的文件给到前端&#xff08;如果没有步骤…

shell脚本 $0-$n $* $@ $# $? $$

各命令详解 1.$0-$n &#xff1a;表示脚本或函数的参数。$0 是脚本的名称&#xff0c;$1 到 $n 是位置参数&#xff0c;每个对应一个传递给脚本或函数的参数。 2.$* &#xff1a;表示所有传递给脚本或函数的参数。它将所有位置参数作为单个字符串显示。 3.$ &#xff1a;表示所…

时序预测 | MATLAB实现GRNN广义回归神经网络时间序列未来多步预测(程序含详细预测步骤)

时序预测 | MATLAB实现GRNN广义回归神经网络时间序列未来多步预测(程序含详细预测步骤) 目录 时序预测 | MATLAB实现GRNN广义回归神经网络时间序列未来多步预测(程序含详细预测步骤)预测效果基本介绍程序设计参考资料预测效果 基本介绍 MATLAB实现GRNN广义回归神经网络时间序列…

大语言模型面试问题【持续更新中】

自己在看面经中遇到的一些面试题&#xff0c;结合自己和理解进行了一下整理。 transformer中求和与归一化中“求和”是什么意思&#xff1f; 求和的意思就是残差层求和&#xff0c;原本的等式为y H(x)转化为y x H(x)&#xff0c;这样做的目的是防止网络层数的加深而造成的梯…

Angular系列教程之观察者模式和RxJS

文章目录 引言RxJS简介RxJS中的设计模式观察者模式迭代器模式 示例代码RxJS 在 Angular 中的应用总结 引言 在Angular开发中&#xff0c;我们经常需要处理异步操作&#xff0c;例如从后端获取数据或与用户的交互。为了更好地管理这些异步操作&#xff0c;Angular中引入了RxJS&…

el-table嵌套两层el-dropdown-menu导致样式错乱

问题&#xff1a; 解决方式&#xff1a; <el-table-column label"操作" fixed"right" width"132" align"center"><template slot-scope"scope"><div v-if"scope.row._index ! 合计"><el-d…

【PWN · GOT表劫持 | 整数溢出】[HGAME 2023 week1]choose_the_seat

整数溢出&#xff0c;加之保护开的不全&#xff0c;可以反复越界修改got表&#xff0c;劫持puts函数实现利用 一、题目概述 限制&#xff1a;v0不可以大于9 理想中数组所在bss端地址&#xff1a; 注意到与got表项距离很近 危险函数只能执行一遍&#xff0c;然后回exit(0) 二…

Next.js 开发指​南(GitHub 115k star​)

Next.js 是一个构建于 Node.js 之上的开源 Web 开发框架&#xff0c;它扩展了最新的 React 特性&#xff0c;集成了基于 Rust 的 JavaScript 工具&#xff0c;可以帮助你快速创建全栈 Web 应用 &#xff08;full-stack Web applications&#xff09; 。 对于有一定 React 基础…

华为数通方向HCIP-DataCom H12-831题库(判断题:21-40)

第21题 OSPF的NSSA区域内,在ASBR路由器上不论路由表中是否存在缺省路由,都会自动产生描述缺省路由的Type7LSA,通告到整个NSSA区域 正确 错误 答案:错误 解析: 在NSSA区域中,ASBR默认情况下不会产生7类LSA表示的默认路由。 第22题 BFD单跳检测是指对两个直连接口进行IP连…

Express安装与基础使用

一、express 介绍 express 是一个基于 Node.js 平台的极简、灵活的 WEB 应用开发框架&#xff0c; 官方网站&#xff1a; Express - 基于 Node.js 平台的 web 应用开发框架 - Express中文文档 | Express中文网 中文文档&#xff1a; 路由 - Express 中文文档 简单来说&am…

Kafka生产消费流程

Kafka生产消费流程 1.Kafka一条消息发送和消费的流程图(非集群) 2.三种发送方式 准备工作 创建maven工程&#xff0c;引入依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.1…