# 消息中间件 RocketMQ 高级功能和源码分析(十)

消息中间件 RocketMQ 高级功能和源码分析(十)

一、消息中间件 RocketMQ 源码分析: 消息消费概述

1、集群模式和广播模式

消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费者组可订阅多个主题,消费组之间有集群模式和广播模式两种消费模式。

  • 集群模式,主题下的同一条消息只允许被其中一个消费者消费。
  • 广播模式,主题下的同一条消息,将被集群内的所有消费者消费一次。

2、消息服务器与消费者之间的消息传递也有两种模式:推模式、拉模式。

所谓的拉模式,是消费端主动拉起拉消息请求,
而推模式是消息达到消息服务器后,推送给消息消费者。

RocketMQ 消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。

3、集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循一个通用思想:一个消息队列同一个时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。

4、RocketMQ 支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费,如果要实现某一个主题的全局顺序消费,可以将该主题的队列数设置为1,牺牲高可用性。

二、消息中间件 RocketMQ 源码分析: 消息消费初探

1、消息推送模式

消息推送 示例图:

2、 消息消费重要方法


void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName):发送消息确认
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) :获取消费者对主题分配了那些消息队列
void registerMessageListener(final MessageListenerConcurrently messageListener):注册并发事件监听器
void registerMessageListener(final MessageListenerOrderly messageListener):注册顺序消息事件监听器
void subscribe(final String topic, final String subExpression):基于主题订阅消息,消息过滤使用表达式
void subscribe(final String topic, final String fullClassName,final String filterClassSource):基于主题订阅消息,消息过滤使用类模式
void subscribe(final String topic, final MessageSelector selector) :订阅消息,并指定队列选择器
void unsubscribe(final String topic):取消消息订阅

3、 DefaultMQPushConsumer

在这里插入图片描述


//消费者组
private String consumerGroup;	
//消息消费模式
private MessageModel messageModel = MessageModel.CLUSTERING;	
//指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
//集群模式下的消息队列负载策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
//订阅信息
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
//消息业务监听器
private MessageListener messageListener;
//消息消费进度存储器
private OffsetStore offsetStore;
//消费者最小线程数量
private int consumeThreadMin = 20;
//消费者最大线程数量
private int consumeThreadMax = 20;
//并发消息消费时处理队列最大跨度
private int consumeConcurrentlyMaxSpan = 2000;
//每1000次流控后打印流控日志
private int pullThresholdForQueue = 1000;
//推模式下任务间隔时间
private long pullInterval = 0;
//推模式下任务拉取的条数,默认32条
private int pullBatchSize = 32;
//每次传入MessageListener#consumerMessage中消息的数量
private int consumeMessageBatchMaxSize = 1;
//是否每次拉取消息都订阅消息
private boolean postSubscriptionWhenPull = false;
//消息重试次数,-1代表16次
private int maxReconsumeTimes = -1;
//消息消费超时时间
private long consumeTimeout = 15;

三、消息中间件 RocketMQ 源码分析: 消息消费启动流程

1、消息消费启动流程 示例图:

在这里插入图片描述

2、 代码:DefaultMQPushConsumerImpl#start


public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;//检查消息者是否合法this.checkConfig();//构建主题订阅信息this.copySubscription();//设置消费者客户端实例名称为进程IDif (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}//创建MQClient实例this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);//构建rebalanceImplthis.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactorthis.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookLisif (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:	 //消息消费广播模式,将消费进度保存在本地this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:	//消息消费集群模式,将消费进度保存在远端Brokerthis.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load//创建顺序消息消费服务if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());//创建并发消息消费服务} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}//消息消费服务启动this.consumeMessageService.start();//注册消费者实例boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown();throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);//启动消费者客户端mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately();
}

四、消息中间件 RocketMQ 源码分析:消息拉取介绍

1、消息拉取 介绍

  • 消息消费模式有两种模式:广播模式与集群模式。广播模式比较简单,每一个消费者需要拉取订阅主题下所有队列的消息。

  • 在集群模式下,同一个消费者组内有多个消息消费者,同一个主题存在多个消费队列,消费者通过负载均衡的方式消费消息。

  • 消息队列负载均衡,通常的作法是一个消息队列在同一个时间只允许被一个消费消费者消费,一个消息消费者可以同时消费多个消息队列。

2、 PullMessageService 实现机制

从 MQClientInstance 的启动流程中可以看出,RocketMQ 使用一个单独的线程 PullMessageService 来负责消息的拉取。

在这里插入图片描述

3、 代码:PullMessageService#run


public void run() {log.info(this.getServiceName() + " service started");//循环拉取消息while (!this.isStopped()) {try {//从请求队列中获取拉取消息请求PullRequest pullRequest = this.pullRequestQueue.take();//拉取消息this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");
}

4、 PullRequest

在这里插入图片描述


private String consumerGroup;	//消费者组
private MessageQueue messageQueue;	//待拉取消息队列
private ProcessQueue processQueue;	//消息处理队列
private long nextOffset;	//待拉取的MessageQueue偏移量
private boolean lockedFirst = false;	//是否被锁定

5、 代码:PullMessageService#pullMessage


private void pullMessage(final PullRequest pullRequest) {//获得消费者实例final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//强转为推送模式消费者DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;//推送消息impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}
}

6、 ProcessQueue 实现机制

ProcessQueue 是 MessageQueue 在消费端的重现、快照。PullMessageService 从消息服务器默认每次拉取 32 条消息,按照消息的队列偏移量顺序存放在 ProcessQueue 中,PullMessageService 然后将消息提交到消费者消费线程池,消息成功消费后从 ProcessQueue 中移除。

在这里插入图片描述

7、 属性


//消息容器
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
//读写锁
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
//ProcessQueue总消息树
private final AtomicLong msgCount = new AtomicLong();
//ProcessQueue队列最大偏移量
private volatile long queueOffsetMax = 0L;
//当前ProcessQueue是否被丢弃
private volatile boolean dropped = false;
//上一次拉取时间戳
private volatile long lastPullTimestamp = System.currentTimeMillis();
//上一次消费时间戳
private volatile long lastConsumeTimestamp = System.currentTimeMillis();

8、 方法


//移除消费超时消息
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer)
//添加消息
public boolean putMessage(final List<MessageExt> msgs)
//获取消息最大间隔
public long getMaxSpan()
//移除消息
public long removeMessage(final List<MessageExt> msgs)
//将consumingMsgOrderlyTreeMap中消息重新放在msgTreeMap,并清空consumingMsgOrderlyTreeMap   
public void rollback() 
//将consumingMsgOrderlyTreeMap消息清除,表示成功处理该批消息
public long commit()
//重新处理该批消息
public void makeMessageToCosumeAgain(List<MessageExt> msgs) 
//从processQueue中取出batchSize条消息
public List<MessageExt> takeMessags(final int batchSize)

五、消息中间件 RocketMQ 源码分析: 客户端发起拉取消息请求

1、消息拉取基本流程 示例图:

在这里插入图片描述

2、客户端发起拉取请求

代码:DefaultMQPushConsumerImpl#pullMessage


public void pullMessage(final PullRequest pullRequest) {//从pullRequest获得ProcessQueuefinal ProcessQueue processQueue = pullRequest.getProcessQueue();//如果处理队列被丢弃,直接返回if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}//如果处理队列未被丢弃,更新时间戳pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {this.makeSureStateOK();} catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);return;}//如果处理队列被挂起,延迟1s后再执行if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return;}//获得最大待处理消息数量long cachedMessageCount = processQueue.getMsgCount().get();//获得最大待处理消息大小long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);//从数量进行流控if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}//从消息大小进行流控if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}//获得订阅信息final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (null == subscriptionData) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);log.warn("find the consumer's subscription failed, {}", pullRequest);return;//与服务端交互,获取消息this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback);}

六、消息中间件 RocketMQ 源码分析: Broker 组装消息

1、消息服务端 Broker 组装消息 示例图:

在这里插入图片描述

2、 代码:PullMessageProcessor#processRequest


//构建消息过滤器
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());
} else {messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());
}
//调用MessageStore.getMessage查找消息
final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), //消费组名称								requestHeader.getTopic(),	//主题名称requestHeader.getQueueId(), //队列IDrequestHeader.getQueueOffset(), 	//待拉取偏移量requestHeader.getMaxMsgNums(), 	//最大拉取消息条数messageFilter	//消息过滤器);

3、 代码:DefaultMessageStore#getMessage


GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;	//查找下一次队列偏移量
long minOffset = 0;		//当前消息队列最小偏移量
long maxOffset = 0;		//当前消息队列最大偏移量
GetMessageResult getResult = new GetMessageResult();
final long maxOffsetPy = this.commitLog.getMaxOffset();	//当前commitLog最大偏移量
//根据主题名称和队列编号获取消息消费队列
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);...
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
//消息偏移量异常情况校对下一次拉取偏移量
if (maxOffset == 0) {	//表示当前消息队列中没有消息status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {	//待拉取消息的偏移量小于队列的其实偏移量status = GetMessageStatus.OFFSET_TOO_SMALL;nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {	//待拉取偏移量为队列最大偏移量status = GetMessageStatus.OFFSET_OVERFLOW_ONE;nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {	//偏移量越界status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;if (0 == minOffset) {nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else {nextBeginOffset = nextOffsetCorrection(offset, maxOffset);}
}
...
//根据偏移量从CommitLog中拉取32条消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);

4、 代码:PullMessageProcessor#processRequest


//根据拉取结果填充responseHeader
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());//判断如果存在主从同步慢,设置下一次拉取任务的ID为主节点
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:break;case SLAVE:if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}break;
}
...
//GetMessageResult与Response的Code转换
switch (getMessageResult.getStatus()) {case FOUND:			//成功response.setCode(ResponseCode.SUCCESS);break;case MESSAGE_WAS_REMOVING:	//消息存放在下一个commitLog中response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);	//消息重试break;case NO_MATCHED_LOGIC_QUEUE:	//未找到队列case NO_MESSAGE_IN_QUEUE:	//队列中未包含消息if (0 != requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);requestHeader.getQueueOffset(),getMessageResult.getNextBeginOffset(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getConsumerGroup());} else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;case NO_MATCHED_MESSAGE:	//未找到消息response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case OFFSET_FOUND_NULL:	//消息物理偏移量为空response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_OVERFLOW_BADLY:	//offset越界response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());break;case OFFSET_OVERFLOW_ONE:	//offset在队列中未找到response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_TOO_SMALL:	//offset未在队列中response.setCode(ResponseCode.PULL_OFFSET_MOVED);requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),getMessageResult.getMinOffset(), channel.remoteAddress());break;default:assert false;break;
}
...
//如果CommitLog标记可用,并且当前Broker为主节点,则更新消息消费进度
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}

七、消息中间件 RocketMQ 源码分析: 消息拉取客户端处理服务端响应

1、消息拉取客户端处理消息 示例图:

在这里插入图片描述

2、 代码:MQClientAPIImpl#processPullResponse


private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {PullStatus pullStatus = PullStatus.NO_NEW_MSG;//判断响应结果switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark());}//解码响应头PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);//封装PullResultExt返回return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}

3、 PullResult 类


private final PullStatus pullStatus;	//拉取结果
private final long nextBeginOffset;	//下次拉取偏移量
private final long minOffset;	//消息队列最小偏移量
private final long maxOffset;	//消息队列最大偏移量
private List<MessageExt> msgFoundList;	//拉取的消息列表

在这里插入图片描述

4、 代码:DefaultMQPushConsumerImpl$PullCallback#OnSuccess


//将拉取到的消息存入processQueue
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//将processQueue提交到consumeMessageService中供消费者消费
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);
//如果pullInterval大于0,则等待pullInterval毫秒后将pullRequest对象放入到PullMessageService中的pullRequestQueue队列中
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}

八、消息中间件 RocketMQ 源码分析: 拉取消息的流程小结

### 1、消息拉取流程总结

上一节关联链接请点击:
# 消息中间件 RocketMQ 高级功能和源码分析(九)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/32071.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PointCloudLib 点云边缘点提取 C++版本

0.实现效果 1.算法原理 PCL(Point Cloud Library)中获取点云边界的算法主要基于点云数据的几何特征和法向量信息。以下是对该算法的详细解释,按照清晰的格式进行归纳: 算法概述 PCL中的点云边界提取算法主要用于从3D点云数据中识别并提取出位于物体边界上的点。这些边界…

邀请函 | 人大金仓邀您相聚第十三届中国国际国防电子展览会

盛夏六月 备受瞩目的 第十三届中国国际国防电子展览会 将于6月26日至28日 在北京国家会议中心盛大举办 作为数据库领域国家队 人大金仓 将携系列行业解决方案 和创新实践成果亮相 期待您莅临指导 ↓↓↓↓↓↓ CIDEX 2024 中国国际国防电子展览会&#xff08;简称CIDEX&#xf…

前端核心框架Vue指令详解

目录 ▐ 关于Vue指令的介绍 ▐ v-text与v-html ▐ v-on ▐ v-model ▐ v-show与v-if ▐ v-bind ▐ v-for ▐ 前言&#xff1a;在学习Vue框架过程中&#xff0c;大家一定要多参考官方API &#xff01; Vue2官方网址https://v2.cn.vuejs.org/v2/guide/ ▐ 关于Vue指令的…

multiprocessing多进程计算及与rabbitmq消息通讯实践

1. 需求与设计 我所设计的计算服务旨在满足多个客户对复杂计算任务的需求。由于这些计算任务通常耗时较长且资源消耗较大&#xff0c;为了优化客户体验并减少等待时间&#xff0c;我采取了并行计算的策略来显著提升计算效率。 为实现这一目标&#xff0c;我计划利用Python的m…

基于Java实训中心管理系统设计和实现(源码+LW+调试文档+讲解等)

&#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN作者、博客专家、全栈领域优质创作者&#xff0c;博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f31f;文末获取源码数据库&#x1f31f; 感兴趣的可以先收藏起来&#xff0c;…

碳化硅陶瓷膜的生产工艺和应用

一、生产工艺 碳化硅陶瓷膜的生产工艺多样&#xff0c;其中浸渍提拉法和喷涂法为两大主流技术。 浸渍提拉法 浸渍提拉法是一种广泛应用的制备方法。其过程主要包括&#xff1a;先将陶瓷颗粒或者聚合物前体分散在水或有机溶剂中&#xff0c;形成均质稳定的制膜液。随后&#xff…

Jenkins macos 下 failed to create dmg 操作不被允许hdiutil: create failed - 操作不被允许?

解决方案&#xff1a; 打开设置&#xff0c;选择“隐私与安全”&#xff0c;选择“完全磁盘访问权限”&#xff0c;点击“”&#xff0c;选择jenkins的路径并添加。 同理&#xff0c;添加java的访问权限。

Python14 面向对象编程

1.什么是面向对象编程OOP Python的面向对象编程&#xff08;Object-Oriented Programming&#xff0c;简称OOP&#xff09;是一种编程范式&#xff0c;它使用“对象”来设计应用程序和计算机程序。这些对象由数据和能够操作这些数据的方法组成。面向对象编程的主要目标是提高软…

Webpack4从入门到精通以及和webpack5对比_webpack现在用的是哪个版本

3.1 打包样式资源css-loader、style-loader… {// 匹配哪些文件test: /\.less$/,// 使用哪些loader进行处理use: [// use数组中loader执行顺序&#xff1a;从右到左&#xff0c;从下到上&#xff0c;依次执行(先执行css-loader)// style-loader&#xff1a;创建style标签&#…

【C++】一个极简但完整的C++程序

一、一个极简但完整的C程序 我们编写程序是为了解决问题和任务的。 1、任务&#xff1a; 某个书店将每本售出的图书的书名和出版社&#xff0c;输入到一个文件中&#xff0c;这些信息以书售出的时间顺序输入&#xff0c;每两周店主会手工计算每本书的销售量、以及每个出版社的…

Vue74-路由传参2

一、$route中的params参数 二、在配置路由的index.js文件中&#xff0c;声明传参 占位符用的什么名字&#xff0c;params里面的key就是什么。 三、<router-link>标签中传参 3-1、to字符串写法 3-2、to的对象写法 注意&#xff1a;若是用params携带参数&#xff0c;不…

mysql的安装以及分享navicat for MySQL

前言 根据网上分享的安装方法以及自己遇到的问题解决方法 一、mysql是什么&#xff1f; mysql 是一个开放源码的小型关联式数据库管理系统 二、安装过程 1.下载安装包 下载地址&#xff1a;MySQL :: Download MySQL Community Server 跳过直接下载&#xff0c;解压即可 …

DPDK的Cache预取和Cache一致性

1.什么是Cache预取 众所周知&#xff0c;CPU访问Cache中的数据是比访问内存中的数据是要快的&#xff0c;而因为程序都有时间局部性和空间局部性&#xff0c;时间局部性简单来说就是某一条或几条指令在一段时间内会被CPU多次执行&#xff1b;空间局部性简单来说就是某一段数据块…

五十五、openlayers官网示例Loading Spinner解析——给地图添加loading效果,瓦片图层加载时等待效果

官网demo地址&#xff1a; Loading Spinner 这篇介绍了一个非常简单的loading效果 利用地图的loadstart和loadend事件&#xff0c;动态的添加和删除class名。 map.on("loadstart", function () {map.getTargetElement().classList.add("spinner");});map…

Vue72-路由传参1

一、需求 点击哪个消息&#xff0c;就展示哪个消息的详情 这是一个三级路由&#xff01; 给路由组件&#xff1a;detail.vue传递消息数据。 二、代码步骤 2-1、编写路由组件 从$route.query属性里面获取传参 2-2、编写路由规则 2-3、编写路由标签&#xff0c;传参 1、to的字…

Ncorr使用过程的问题解答

问题系列 文章目录 问题系列前言一、如何更改单位&#xff1f;情景&#xff1a;DIC Analysis 二、拉格兰日和欧拉绘图的区别直观 三、控制图像中的显示条上下界限问题展示&#xff1a;解决方案&#xff1a; 更新动态 前言 主要用于记录使用过程中出现的相关问题。 一、如何更改…

数据结构:为什么说链表是顺序表的升级版(c语言实现)

前言&#xff1a; 我们在之前的几篇文章中详细的讲解了顺序表的特点&#xff0c;增删改查操作和动态顺序表的优点&#xff0c;并使用顺序表的底层结构实现了通讯录项目&#xff0c;似乎顺序表是一个非常完美的数据结构&#xff0c;它可以实现按照需求实现增删查改&#xff0c;对…

做好海外ASO优化的7大核心要素你了解几个?

海外App进行ASO优化时&#xff0c;需要综合考虑多个方面以确保应用在应用商店中获得更高的曝光率和下载量。以下是一些关键的ASO优化步骤&#xff0c;结合参考文章中的相关信息进行详细阐述&#xff1a; 1.关键词优化 调研目标市场的用户行为和检索习惯&#xff0c;挖掘与应用…

锂磷硫(LPS)属于硫化物固态电解质 Li7P3S11是代表性产品

锂磷硫&#xff08;LPS&#xff09;属于硫化物固态电解质 Li7P3S11是代表性产品 锂磷硫&#xff08;LPS&#xff09;&#xff0c;为非晶态材料&#xff0c;是硫化物固态电解质代表性产品之一&#xff0c;具有热稳定性好、成本较低等优点&#xff0c;在固态电解质中离子电导率较…

【Deep Learning】Meta-Learning:训练训练神经网络的神经网络

元学习&#xff1a;训练训练神经网络的神经网络 本文基于清华大学《深度学习》第12节《Beyond Supervised Learning》的内容撰写&#xff0c;既是课堂笔记&#xff0c;亦是作者的一些理解。 1 Meta-Learning 在经典监督学习中&#xff0c;给定训练数据 { ( x i , y i ) } i \{…