RocketMQ源码阅读-Producer发消息

RocketMQ源码阅读-Producer发消息

  • 1. 从单元测试入手
  • 2. 启动过程
  • 3. 同步消息发送过程
  • 4. 异步消息发送过程
  • 5. 小结

Producer是消息的生产者。

Producer和Consummer对Rocket来说都是Client,Server是NameServer。

客户端在源码中是一个单独的Model,目录为rocketmq/client。

类DefaultMQProducer是Producer的默认入口实现类。
image.png
继承了类ClientConfig,客户端配置类,存储上下文配置信息。
实现接口MQProducer:
image.png
定义了Producer对外提供的接口,也就是所有的发送消息的方法,同时MQProducer接口继承了MQAdmin接口。
如上图中MQAdmin是元数据管理接口,定义了对Message操作的一些方法。

DefaultMQProducer中有一个重要的成员变量:

protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

DefaultMQProducer的所有操作,基本没什么业务逻辑,都是调用DefaultMQProducerImpl类中的方法。

DefaultMQProducerImpl类是Producer操作的具体实现类。

1. 从单元测试入手

看源码的流程都是从单元测试入手

Producer的单元测试在类org.apache.rocketmq.client.producer.DefaultMQProducerTest中。
单元测试的所有方法,就对应着这个类的全部功能。
DefaultMQProducerTest的方法列表如下:

其中 init 和 terminate 是测试开始初始化和测试结束销毁时需要执行的代码。
其他的方法是测试不同功能的测试用例,也就是测试不同的发消息的方式。

2. 启动过程

从单元测试中的init方法入手:

@Before
public void init() throws Exception {String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();// 创建一个Producer,并赋予名字producer = new DefaultMQProducer(producerGroupTemp);// 设置NameServer的地址producer.setNamesrvAddr("127.0.0.1:9876");// 消息长度大于16开启压缩producer.setCompressMsgBodyOverHowmuch(16);// 创建不同的Messagemessage = new Message(topic, new byte[] {'a'});zeroMsg = new Message(topic, new byte[] {});bigMessage = new Message(topic, "This is a very huge message!".getBytes());// 启动Producerproducer.start();// 设置mQClientFactory和mQClientAPIImpl,后面再讲Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");field.setAccessible(true);field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");field.setAccessible(true);field.set(mQClientFactory, mQClientAPIImpl);// 注册Producerproducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenReturn(createSendResult(SendStatus.SEND_OK));
}

这段代码就是创建了一个DefaultMQProducer,设置参数,并调用start()方法启动,之后注册到NameServer中。

首先,看下DefaultMQProducer#start()方法:

@Override
public void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));// 直接调用defaultMQProducerImpl的start方法this.defaultMQProducerImpl.start();if (null != traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}
}

其中直接调用了DefaultMQProducerImpl的start方法:

public void start() throws MQClientException {this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 获取MQClientInstance实例this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 注册boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {// 启动mQClientFactorymQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}// 给所有的broker发心跳this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

RocketMQ 使用一个成员变量 serviceState 来记录和管理自身的服务状态,这实际上是状态模式 (State Pattern) 这种设计模式的变种实现。

对于启动过程,状态serviceState为CREATE_JUST。
CREATE_JUST分支中,会获取一个MQClientManager(单例模式),通过方法getAndCreateMQClientInstance()获取一个MQClientInstance实例,赋值给成员变量:

private MQClientInstance mQClientFactory;

然后调用MQClientInstance的registerProducer()方法,将自己注册到MQClientInstance中。
随后,调用MQClientInstance的start(),启动mQClientFactory。
最后,给所有的broker发心跳。

进一步看MQClientInstance的start()方法:

public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}

这一部分代码的注释比较清楚,流程是这样的:

  1. 启动实例 mQClientAPIImpl,其中 mQClientAPIImpl 是类 MQClientAPIImpl 的实例,封装了客户端与 Broker 通信的方法;
  2. 启动各种定时任务,包括与 Broker 之间的定时心跳,定时与 NameServer 同步数据等任务;
  3. 启动拉取消息服务;
  4. 启动 Rebalance 服务;
  5. 启动默认的 Producer 服务。

以上就是 Producer 的启动流程。

3. 同步消息发送过程

分析 Producer 发送消息的流程。
接口 MQProducer 中,定义了 19 个不同参数的发消息的方法。
这19个接口可以分为3类:

  • 单向发送(Oneway):发送消息后立即返回,不处理响应,不关心是否发送成功;
  • 同步发送(Sync):发送消息后等待响应;
  • 异步发送(Async):发送消息后立即返回,在提供的回调方法中处理响应。

先看下同步发送消息的方法(异步发送消息,只是将同步发送方法提交给线程池)。
DefaultMQProducer中对同步发送方法的实现为:

@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {Validators.checkMessage(msg, this);msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg);
}

实际调用的为DefaultMQProducerImpl的send()方法:

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

最终调用的方法为DefaultMQProducerImpl的sendDefaultImpl()方法,源码位置为517行:
(源码偏长,在下方进行解读)

private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 获取Topic信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 发消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();if (null == nsList || nsList.isEmpty()) {throw new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);}throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

首先是获取Topic信息,然后调用sendKernelImpl()方法发送消息,sendKernelImpl()源码如下:

private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}// 同步发送消息sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

也很长,但是逻辑简单,主要功能就是构建发送消息的头 RequestHeader 和上下文 SendMessageContext,然后调用方法 MQClientAPIImpl#sendMessage()(上述代码第150行),将消息发送给队列所在的 Broker。

至此,消息被发送给远程调用的封装类 MQClientAPIImpl,完成后续序列化和网络传输等步骤。

这一篇主要看发送代码,后续序列化和网络传输等步骤的代码后面再去探究。

ps:推荐IEDA插件SequenceDiagram,能一键生成时序图

整体的时序图为:
同步发送消息时序图.png

4. 异步消息发送过程

上一节讲到异步发送消息,只是将同步发送方法提交给线程池。对于MQProducer接口中方法为:

void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException;

也就是带有回调方法,DefaultMQProducer中的实现为:

@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}

调用了DefaultMQProducerImpl的send方法:

@Deprecated
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {final long beginStartTime = System.currentTimeMillis();ExecutorService executor = this.getAsyncSenderExecutor();try {executor.submit(new Runnable() {@Overridepublic void run() {long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {try {sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);} catch (Exception e) {sendCallback.onException(e);}} else {sendCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}});} catch (RejectedExecutionException e) {throw new MQClientException("executor rejected ", e);}}

可以看到,实际上是将DefaultMQProducerImpl的sendDefaultImpl()方法(对应源码517行)提交到线程池执行,后面的流程都和同步发送一致,参看上一节分析。

异步发送消息时序图为:
异步发消息时序图.png
至此,异步消息发送也完成了。

5. 小结

MQProducer定义了19种发送消息的方法,其默认实现为DefaultMQProducer。DefaultMQProducer中的方法没有业务逻辑,最终会调用DefaultMQProducerImpl类中的具体逻辑。

DefaultMQProducerImpl中发消息的方法最终都会调用其sendKernelImpl()方法,其主要功能就是构建发送消息的头 RequestHeader 和上下文 SendMessageContext,然后调用方法 MQClientAPIImpl#sendMessage(),将消息发送给队列所在的 Broker。

最终消息被发送给远程调用的封装类 MQClientAPIImpl,完成后续序列化和网络传输等步骤。

类之间的关系为:
发消息类图.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/616311.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Torch not compiled with CUDA enabled

最近接触chatglm3对话预训练模型&#xff0c;从git上下载&#xff0c;装包装半天&#xff0c;最后终于跑起来了&#xff0c;但是一对他进行对话&#xff0c;后台就开始报错了 File "E:\Python311\Lib\site-packages\torch\nn\modules\linear.py", line 114, in forw…

【c++】入门4

内联函数声明和定义不能分开 inline不建议声明和定义分离&#xff0c;分离会导致链接错误。因为inline被展开&#xff0c;就没有函数地址 了&#xff0c;链接就会找不到。 auto关键字 随着程序越来越复杂&#xff0c;程序中用到的类型也越来越复杂&#xff0c;经常体现在&…

动态pv策略和组件

pv和pvc&#xff0c;存储卷&#xff1a; 存储卷&#xff1a; emptyDir 容器内部&#xff0c;随着pod销毁&#xff0c;emptyDir也会消失 不能做数据持久化 hostPath&#xff1a;持久化存储数据 可以和节点上的目录做挂载。pod被销毁了数据还在 NFS&#xff1a;一台机器&am…

x-cmd pkg | tsx - Node.js 的直接替代品

目录 简介首次用户功能特点竞品和相关作品进一步探索 简介 tsx 代表 “TypeScript execute”&#xff0c;由 TypeScript 编写&#xff0c;内部使用由 Go 语言编写的 esbuild 核心二进制实现超快的 TypeScript 编译&#xff0c;旨在增强 Node.js 以无缝运行 TypeScript / ESM /…

Golang的API项目快速开始

开启一个简单的API服务。 golang的教程网上一大堆&#xff0c;官网也有非常详细的教程&#xff0c;这里不在赘述这些基础语法教程&#xff0c;我们意在快速进入项目开发阶段。 golang好用语法教程传送门&#xff1a; m.runoob.com/go/ 编写第一个API 前提&#xff1a;按照上一…

[Oracle][详细] Win完全卸载Oracle

前提准备 进入服务 找到Oracle开头的服务 将这些服务全部停止 Top 1 点击开始菜单找到Oracle,然后点击Oracle安装产品,再点击【Universal Installer】 Top 2 点击之后稍等一会然后会进入进入下图界面,点击卸载产品 Top 3 选中要删除的Oracle产品,然后点击【删除】 Top 4 进…

性能监控软件选择攻略

随着企业对应用程序性能的关注度不断增加&#xff0c;选择适当的性能监控软件变得至关重要。性能监控软件能够帮助企业实时追踪应用程序的性能指标&#xff0c;识别潜在问题并提高系统的稳定性。在选择性能监控软件时&#xff0c;以下攻略将有助于确保您的选择符合业务需求并能…

SpringIOC之support模块GenericApplicationContext

博主介绍:✌全网粉丝5W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌ 博主作品:《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis-plus+…

鸿蒙开发笔记(一):ArkTS概述及声明式UI的使用

ArkTS是HarmonyOS优选的主力应用开发语言。ArkTS围绕应用开发在TypeScript&#xff08;简称TS&#xff09;生态基础上做了进一步扩展&#xff0c;继承了TS的所有特性&#xff0c;是TS的超集。 ArkTS在TS的基础上主要扩展了如下能力&#xff1a; 基本语法&#xff1a;ArkTS定义…

Docker实战06|深入剖析Docker Run命令

前几篇文章中&#xff0c;重点讲解了Linux Namespace、Cgroups、AUFS的核心原理&#xff0c;同样也是Docker的底层原理实现。目录如下&#xff1a; • 《Docker实战01&#xff5c;容器与开发语言》 • 《Docker实战02&#xff5c;Namespace》 • 《Docker实战03&#xff5c;C…

软件测试公式之如何高质量的做BUG分析?

对于BUG分析&#xff0c;测试人员再熟悉不过了。但如果是面对大量的BUG&#xff0c;要如何有效的分析呢&#xff1f;有什么好的方案和行动项&#xff1f;今天聊聊这个话题。 01 BUG分析简单可以分为两类&#xff1a;宏观BUG分析 和 微观BUG分析。 宏观BUG分析&#xff1a;在…

【python】进阶--->MySQL数据库(一)

一、mysql数据库 关系型数据库 &#xff1a; 一些相关的表和其他数据库对象的集合。 表是由行和列组成。列包含一组命名的属性(也称为字段)。 行包含一组记录&#xff0c;行和列的交集称为数据项(也叫字段值)。 数据库(database) : 存储数据的仓库&#xff0c;本质上就是一个文…

《C++ Primer》第14章 重载运算与类型转换(二)

参考资料&#xff1a; 《C Primer》第5版《C Primer 习题集》第5版 14.8 函数调用运算符&#xff08;P506&#xff09; 如果类重载了函数调用运算符&#xff0c;则我们可以像使用函数一样使用该类的对象。这样的类同时也能存储状态&#xff0c;所以它们比普通函数更加灵活。…

x-cmd pkg | czg - git commit 智能生成工具

目录 简介首次用户功能特点竞品和相关作品进一步探索 简介 czg 源于 commitizen/cz-cli 交互插件中 cz-git 的延伸项目&#xff0c;重新使用 TypeScript 编写的零依赖独立的 Node.js 命令行工具。旨在使用交互友好的方式&#xff0c;辅助用户生成规范的 git commit message 约…

游泳耳机哪种款式好?最值得入手的游泳耳机大全

在如今注重健康和娱乐的生活方式中&#xff0c;游泳作为一项全身性的运动备受欢迎。然而&#xff0c;对于热爱水中活动的人们来说&#xff0c;选择一款出色的游泳耳机至关重要。好的游泳耳机不仅能提供清晰的音质&#xff0c;还能有效防水&#xff0c;让您在水中尽情畅游的同时…

MySQL中约束是什么?

&#x1f389;欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克&#x1f379; ✨博客主页&#xff1a;小小恶斯法克的博客 &#x1f388;该系列文章专栏&#xff1a;重拾MySQL &#x1f379;文章作者技术和水平很有限&#xff0c;如果文中出现错误&am…

记录由客户端http请求原因引起的5xx响应问题排查过程

看到 http 状态码 5xx&#xff0c;很多开发者第一感觉就是服务端的问题&#xff0c;其实并不全是。下面我遇到的问题就是一个例外。 问题描述 最近在为反向代理 nginx 配置 auth_request 后&#xff0c;出现了请求504错误。 504状态码是HTTP协议中的一种服务器错误状态码。当…

Windows压缩包的MySQL安装方式

1.下载压缩包 https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-8.0.35-winx64.zip 2.解压压缩包&#xff08;建议将解压到非C盘&#xff0c;路径不要出现特殊符号&#xff09; 3.在MySQL主目录下&#xff0c;创建my.ini空文件&#xff08;先创建一个txt文件&#xff0c;进…

JavaScript删除数组中指定元素的5种方法

文章目录 目录 文章目录 前言 一、数组是什么&#xff1f; 二、讲解数组 总结 前言 在JavaScript开发中&#xff0c;处理数组是一项非常常见的任务。有时候我们需要从数组中删除特定的元素&#xff0c;以便对数组进行进一步操作或者满足特定的需求。幸运的是&#xff0c;JavaS…

Swoft - Bean

一、Bean 在 Swoft 中&#xff0c;一个 Bean 就是一个类的一个对象实例。 它(Bean)是通过容器来存放和管理整个生命周期的。 最直观的感受就是省去了频繁new的过程&#xff0c;节省了资源的开销。 二、Bean的使用 1、创建Bean 在【gateway/app/Http/Controller】下新建一个名为…