RocketMQ5.0Pop消费模式

前言

RocketMQ 5.0 消费者引入了一种新的消费模式:Pop 消费模式,目的是解决 Push 消费模式的一些痛点。
RocketMQ 4.x 之前,消费模式分为两种:

  • Pull:拉模式,消费者自行拉取消息、上报消费结果
  • Push:推模式,消费者主动拉取消息,看起来像是 Broker 主动推送消息,主动上报消费结果

Pop 消费模式可以看作是 Push 的升级版,Push 消费模式存在以下痛点:

  • 客户端负责队列重平衡、消息拉取、消费位点上报、消费失败重试等功能,逻辑太重了,不利于多语言客户端发展
  • 队列数/消费者数量变更会触发重平衡操作,期间消息无法消费,容易造成消息堆积
  • 队列和消费者强绑定,消费者数量超过队列数后,无法再水平扩容
  • 队列和消费者强绑定,消费者僵死状态下导致消息堆积

Pop 消费模式就是要解决这些痛点的,它的设计目标:

  • 消费者只管消息拉取、消息消费、上报 ACK,客户端 SDK 轻量级
  • 队列不再绑定消费者,消费者可以消费所有队列消息
  • 消费者可以很方便的水平扩容

要实现这个目标还是有不小的挑战,看看 RocketMQ 是如何做到的吧。

设计难点

Pop 消费模式是存在一些设计难点的。
Push 模式下队列为什么要和消费者绑定?也就是一个队列同一时间只允许被一个消费者消费,因为这样实现起来简单啊,主要体现在:

  • Broker 消息投递处理简单,根据消费者请求的拉取位点投递
  • Broker 消费位点管理简单,无需记录消息是否被消费,只需要记录消费位点,位点前的都消费了,位点后的都没消费

如果改成 Pop 模式,首先面临的挑战有:
1、Broker 消息投递时,要记录哪些消息已经投递了,哪些消息还没投递
Pop 模式下一个队列可以被多个消费者消费,假设现在队列里面有 1~10 号消息,消费者A 拉取了 1~3 号消息,消费者B 再拉取的时候,Broker 必须知道 1~3 号消息已经投递过在消费中了,不能再投递给消费者B了,得投递 4 号及之后的消息给消费者B才行。
image.png

2、Broker 要记录哪些消息消费成功了,哪些消息消费失败了,不能单纯记录消费位点了
因为队列可以被多个消费者消费,大家都在上报队列粒度的消费位点,Broker 没法管理了。

你看,为了可以让队列被多个消费者消费,Broker 已经不能再按队列维度去管理信息了,必须精确到消息粒度,这就需要额外的数据结构来支撑。

设计实现

取消客户端队列 Rebalance
客户端重平衡,不仅限制了消费者的消费能力,还增加了客户端复杂度,重平衡期间消息无法被消费,还容易造成消息堆积。Pop 消费模式下,消费者可以消费所有队列,也就不再需要客户端重平衡了。消费者查询给自己分配的队列时,Proxy 返回的是 Broker 维度且 queueId=-1 的逻辑队列,Broker 端会投递所有队列的消息。
image.png

invisibleTime
Pop 消费模式下,消息投递后会有一个invisibleTime的概念,即消息的不可见时间,默认是 60 秒。比如 M1 投递给消费者A后,在不可见时间段内,其它消费者是无法消费这条消息的。
image.png
另外 Broker 还提供了changeInvisibleTime()接口修改单条消息的不可见时间,比如消息消费失败后,会根据重试次数来设置新的不可见时间。
image.png

CK & ACK 消息
RocketMQ 为了记录消息是否被消费成功,引入了 CK 和 ACK 消息,以及一个专属的 Topic:rmq_sys_REVIVE_LOG_{clusterName}
消费者在拉取消息时,Broker 端会给拉取到的这一批消息发一个 CK 消息,CK 消息记录了各消息的偏移量可以定位到具体消息,同时用位图记录了各消息的 ACK 情况。Consumer 消费成功后会调用ackMessage接口,Broker 会发送一个对应的 ACK 消息;消费失败后会调用changeInvisibleTime接口,延长消息不可见时间,底层是先发一个旧消息的 ACK 消息,再发一个新消息的 CK 消息。
image.png
上述流程的运转是基于消费者正常处理消费结果的前提下的,如果消费者挂了,既不发 ack 也不发 nack,Broker 又该怎么处理这些消息呢?其实也能正常处理,因为 CK 和 ACK 消息均是延时消息,延迟的时间即消息的不可见时间,CK 消息会提前一秒消费,目的是匹配 ACK 消息。
RocketMQ 会启动八个线程消费 REVIVE Topic 对应的八个队列,匹配 CK 消息里还有哪些消息没被 ack,再将这些没被 ack 的消息发送到 RETRY Topic,消费者就可以重新消费了。

源码

先通过流程图熟悉下 Pop 消费模式的大体流程:
image.png

ProcessQueue

PushConsumer 启动后,会定时向 Proxy 查询分配的队列:

protected void startUp() throws Exception {......scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {try {// 扫描分配的队列scanAssignments();} catch (Throwable t) {log.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);}}, 1, 5, TimeUnit.SECONDS);......
}

Pop 消费模式下,客户端无须知道具体的队列数据,因为 Broker 会投递所有队列消息,所以 Proxy 只返回 Broker 维度且 queueId=-1 的一个逻辑队列即可。
消费者拿到分配的队列后,紧接着调用syncProcessQueue()方法处理队列,有两种情况:

  • 队列不再分配给自己,停止拉取消息,移除队列
  • 新分配的队列,立即拉取消息

对于新分配的队列,消费者会创建ProcessQueue对象开始拉取队列消息:

private void receiveMessageImmediately() {// Broker端点列表 gRPC负载均衡调用final Endpoints endpoints = mq.getBroker().getEndpoints();// 构建请求final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);// 发请求final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,consumer.getPushConsumerSettings().getLongPollingTimeout());Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {@Overridepublic void onSuccess(ReceiveMessageResult result) {// 处理消息onReceiveMessageResult(result);}}
}

PopMessageProcessor

Proxy 端会开启 gRPC 服务,GrpcMessagingApplication 用来处理客户端的请求,拉取消息对应的处理方法是receiveMessage,最终会交给 Broker 的 PopMessageProcessor 处理,主要步骤:

  • 校验请求参数、队列写权限等等
  • 构建消息过滤器 ExpressionMessageFilter
  • 按照 1/5 的概率先尝试从重试队列里拉取消息
  • 遍历所有队列拉取消息,直到拉取最大消息数
  • 如果普通队列拉取的消息数没达到最大消息数,再尝试拉取重试队列
  • 如果还有剩余消息,通知其它被长轮询挂起的请求继续拉取
  • 返回结果
private RemotingCommand processRequest(final Channel channel, RemotingCommand request)throws RemotingCommandException {......// 参数、权限校验// Topic配置TopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());// 消费组订阅配置SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());// 消息过滤ExpressionMessageFilter messageFilter = ......// POP模式下,消费者要遍历所有队列,随机一个下标开始读取,都从0开始读的话,存在热点队列,竞争锁int randomQ = random.nextInt(100);int reviveQid; // REVEIE队列ID 顺序消息999 普通消息:0~7轮询if (requestHeader.isOrder()) {reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE;} else {reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());}int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg();GetMessageResult getMessageResult = new GetMessageResult(commercialSizePerMsg);long restNum = 0; // 剩余消息数// 平均每5次请求,读取一下重试队列boolean needRetry = randomQ % 5 == 0;if (needRetry && !requestHeader.isOrder()) {restNum = popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid,channel, popTime, messageFilter,startOffsetInfo, msgOffsetInfo, orderCountInfo);}// POP模式下,请求的queueId=-1,读所有队列if (requestHeader.getQueueId() < 0) {for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {int queueId = (randomQ + i) % topicConfig.getReadQueueNums();restNum = popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter,startOffsetInfo, msgOffsetInfo, orderCountInfo);}}// 拉取到的消息数量不满足消费者期望的数量,接着拉取重试队列:%RETRY%{consumerGroup}_{topic}......if (!getMessageResult.getMessageBufferList().isEmpty()) {// 拉取到消息,且队列里面还有消息,通知其它拉取请求if (restNum > 0) {notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),requestHeader.getQueueId());}} else {// 没有消息,进入长轮询状态 挂起int pollingResult = polling(channel, request, requestHeader);}......
}

通过队列获取消息的方法是popMsgFromQueue,虽然队列可以被多个消费者消费,但是同一个消费组下,针对同一个队列拉取消息的行为必须保证串行,所以 Broker 首先会构建一个topic@group@queueId格式的字符串作为 lockKey 保证加锁成功,然后再获取消息拉取位点 offset,调用 MessageStore 获取消息,最终给这批消息记录 CK 消息。

private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,Channel channel, long popTime,ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),requestHeader.getConsumerGroup()) : requestHeader.getTopic();// 给队列加锁 同一消费组下的队列串行读取 topic@group@queueIdString lockKey =topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;// 加锁if (!queueLockManager.tryLock(lockKey)) {restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;return restNum;}// 拉取位点offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);// 获取消息GetMessageResult getMessageTmpResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), topic, queueId, offset,requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {// 追加CheckPointappendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());}......queueLockManager.unLock(lockKey);
}

image.png
这里的锁实现很简单,通过 AtomicBoolean CAS 修改上锁标记位来判断是否加锁成功:

static class TimedLock {private final AtomicBoolean lock;private volatile long lockTime;
}

加锁成功后,Broker 得知道该从哪里开始给消费者投递消息,这就是拉取位点的获取:

  • 首先取已提交的消费位点
  • 如果 CK 缓冲区有已投递的 PopCheckPoint,则取缓冲区的拉取位点,避免已经投递过的消息重复投递
private long getPopOffset(String topic, PopMessageRequestHeader requestHeader, int queueId, boolean init,String lockKey) {// 已提交的消费位点long offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),topic, queueId);if (offset < 0) {// 还没消费过 判断是从最旧还是最新的开始消费if (ConsumeInitMode.MIN == requestHeader.getInitMode()) {offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);} else {// pop last one,then commit offset.offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;// max & no consumer offsetif (offset < 0) {offset = 0;}if (init) {this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset",requestHeader.getConsumerGroup(), topic,queueId, offset);}}}// CK缓冲区的位点,这些是已投递、还没提交消费的位点long bufferOffset = this.popBufferMergeService.getLatestOffset(lockKey);if (bufferOffset < 0) {return offset;} else {return bufferOffset > offset ? bufferOffset : offset;}
}

有了拉取位点,接下来就是通过 MessageStore 查询消息了,底层会读取 consumequeue 和 commitlog 文件,这里不赘述。拉取到消息列表后,Broker 会给这一批消息记录一个 CK 消息,用于后续匹配 ACK 消息。

PopCheckPoint

CK 消息对应的类是 PopCheckPoint,主要记录了:消息拉取时间、消息偏移量、不可见时间、消息 ACK 位图等等。

public class PopCheckPoint {// 起始偏移量@JSONField(name = "so")private long startOffset;// 消息拉取时间@JSONField(name = "pt")private long popTime;@JSONField(name = "it")private long invisibleTime;// 位图 收到ACK消息则把对应位设为1@JSONField(name = "bm")private int bitMap;// 消息数量@JSONField(name = "n")private byte num;@JSONField(name = "q")private byte queueId;@JSONField(name = "t")private String topic;// 消费组@JSONField(name = "c")private String cid;@JSONField(name = "ro")private long reviveOffset;// 消息增量偏移量@JSONField(name = "d")private List<Integer> queueOffsetDiff;@JSONField(name = "bn")String brokerName;
}

构建好 PopCheckPoint 对象,Broker 会把它作为一个普通消息写入 commitlog 持久化,消息内容:

topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ck
body: json(PopCheckPoint)
deliverTimeMs: invisibleTime-1s

CK 消息存储完毕后,就可以正常返回消息了。需要注意的是,消息在返回给消费者前,Broker 会给消息设置一个很重要的属性:POP_CK。它是消息关联的 CK 消息的句柄字符串,消费者基于该属性来 ack 消息。

messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),responseHeader.getReviveQid(), messageExt.getTopic(), messageQueue.getBrokerName(), messageExt.getQueueId(), msgQueueOffset)
);

POP_CK由以下八个属性构成,空格连接,通过该属性可以快速定位到 CK 消息。

{ckQueueOffset} {popTime} {invisibleTime} {reviveQid} {0/1} {brokerName} {queueId} {msgQueueOffset}# 示例值
2 1698656741635 60000 0 0 broker-a 3 2

AckMessageProcessor

消费者消费完消息后,会调用eraseMessage()擦除消息,也就是根据消费结果判断是 ack 还是 nack。

public void eraseMessage(MessageViewImpl messageView, ConsumeResult consumeResult) {statsConsumptionResult(consumeResult);ListenableFuture<Void> future = ConsumeResult.SUCCESS.equals(consumeResult) ? ackMessage(messageView) :nackMessage(messageView);future.addListener(() -> evictCache(messageView), MoreExecutors.directExecutor());
}

如果消费成功,则调用ackMessage接口,Broker 的处理方式也很简单,就是构建一个 AckMsg 对象,然后把它作为消息体发一个 ACK 消息。

private RemotingCommand processRequest(final Channel channel, RemotingCommand request,boolean brokerAllowSuspend) throws RemotingCommandException {......AckMsg ackMsg = new AckMsg();ackMsg.setAckOffset(requestHeader.getOffset());ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo));ackMsg.setConsumerGroup(requestHeader.getConsumerGroup());ackMsg.setTopic(requestHeader.getTopic());ackMsg.setQueueId(requestHeader.getQueueId());ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo));ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {顺序消息处理}// 构建消息存储MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(reviveTopic);msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));msgInner.setQueueId(rqId);msgInner.setTags(PopAckConstants.ACK_TAG);msgInner.setBornTimestamp(System.currentTimeMillis());msgInner.setBornHost(this.brokerController.getStoreHost());msgInner.setStoreHost(this.brokerController.getStoreHost());// 延时消息 拉取时间+不可见时间msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo));msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);return response;
}

ACK 消息内容如下:

topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ack
body: json(AckMsg)
deliverTimeMs: invisibleTime

注意:CK & ACK 消息同属一个Topic

ChangeInvisibleTimeProcessor

如果消费失败,会根据重试次数调用changeInvisibleDuration接口延长消息不可见时间。Broker 最终会由 ChangeInvisibleTimeProcessor 处理请求,因为消息是按照紧凑的方式顺序写入 commitlog,所以写入后就不支持修改了,所谓的修改消息不可见时间,其实是先发一个新的 CK 消息,再发一个旧消息的 ACK 消息。

private RemotingCommand processRequest(final Channel channel, RemotingCommand request,boolean brokerAllowSuspend) throws RemotingCommandException {......// add new cklong now = System.currentTimeMillis();PutMessageResult ckResult = appendCheckPoint(requestHeader, ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), requestHeader.getOffset(), now, ExtraInfoUtil.getBrokerName(extraInfo));// ack old msgackOrigin(requestHeader, extraInfo);
}

所以,无论消费者是调用 ack 还是 nack,消息一定会被 ack 掉。另外还有一种场景就是消费者异常,没有上报消费结果,Broker 在消息不可见时间到期后会把这些没被 ack 掉的消息发到重试队列里,让其它消费者消费。

PopReviveService

CK & ACK 消息存储下来以后,由谁来处理呢?AckMessageProcessor 会开启八个线程,消费 REVIVE Topic 的八个队列,线程对应的服务是 PopReviveService。
PopReviveService 线程每秒执行一次,消费 REVIVE Topic 里的消息。如果是 CK 消息则重新构建 PopCheckPoint 对象;如果是 ACK 消息则更新 PopCheckPoint 里的位图,通过 tag 来区分消息类型。
最后处理 PopCheckPoint,已经被 ack 的消息不做处理,未被 ack 的消息会构建一条新消息重新发送到重试队列,Topic 规则是:%RETRY%{consumerGroup}_{topic}
image.png

public void run() {......ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();// 消费Revive队列消息,构建PopCheckPoint放入mapconsumeReviveMessage(consumeReviveObj);// 处理PopCheckPointmergeAndRevive(consumeReviveObj);
}
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {HashMap<String, PopCheckPoint> map = consumeReviveObj.map;// 消费位点long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId);while (true) {// 查询REVIVE队列消息List<MessageExt> messageExts = getReviveMessage(offset, queueId);for (MessageExt messageExt : messageExts) {//ck消息if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {String raw = new String(messageExt.getBody(), DataConverter.charset);PopCheckPoint point = JSON.parseObject(raw, PopCheckPoint.class);map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {// ack消息String raw = new String(messageExt.getBody(), DataConverter.charset);AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);PopCheckPoint point = map.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime());// 设置CheckPoint ACK位图int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());if (indexOfAck > -1) {point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));} else {POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);}}        }}
}

CK 消息到期后,会触发reviveMsgFromCk恢复没有被 ack 的消息,处理方式是构建新消息发到重试队列。

private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {for (int j = 0; j < popCheckPoint.getNum(); j++) {if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {// 已经ack了,跳过continue;}// 查询实际消息long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());if (messageExt == null) {POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ",queueId, popCheckPoint.getTopic(), msgOffset);continue;}//skip ck from last epochif (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint);continue;}// 重试 构建一条消息重新Put到commitlog topic:%RETRY%{consumerGroup}_{topic}reviveRetry(popCheckPoint, messageExt);}
}

PopBufferMergeService

为了记录消息的 ack 状态,RocketMQ 需要存储额外的 CK & ACK 消息,开销还是比较大的。而消息存储的目的是为了匹配消息的 ack 状态,把未被 ack 的消息发送到重试队列里。
消息消费一般是很快的,意味着正常情况下,很快就能收到消费者发过来的 ack 和 nack 请求,那为什么不直接在内存里完成匹配呢?这样做可以大幅提升性能,消息落盘只作为一个兜底方案。
RocketMQ 是支持优先在内存里匹配 ack 消息的,默认是关闭状态,需要手动把enablePopBufferMerge打开。
开启内存匹配后,PopCheckPoint 会优先只追加到缓冲区,只有当缓冲区添加失败才会落地磁盘。

public boolean addCk(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {// key: point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt()if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {// 启用内存匹配return false;}if (!serving) {return false;}long now = System.currentTimeMillis();if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ck, timeout, {}, {}", point, now);}return false;}if (this.counter.get() > brokerController.getBrokerConfig().getPopCkMaxBufferSize()) {POP_LOGGER.warn("[PopBuffer]add ck, max size, {}, {}", point, this.counter.get());return false;}PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset);if (!checkQueueOk(pointWrapper)) {// 队列默认不超过20000个return false;}// 添加到commitOffsetsputOffsetQueue(pointWrapper);// 添加到缓冲区this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);this.counter.incrementAndGet();if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]add ck, {}", pointWrapper);}return true;
}

同样的,Broker 在接收到消息 ack 请求时,也会优先只改缓冲区的 PopCheckPoint ack 位图,无需存储 ACK 消息。

 public boolean addAk(int reviveQid, AckMsg ackMsg) {if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {return false;}if (!serving) {return false;}try {PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());if (pointWrapper == null) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, no ck, {}", reviveQid, ackMsg);}return false;}if (pointWrapper.isJustOffset()) {return false;}PopCheckPoint point = pointWrapper.getCk();long now = System.currentTimeMillis();if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);}return false;}if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() - 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, stay too long, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);}return false;}// 直接更改内存里的位图int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());if (indexOfAck > -1) {markBitCAS(pointWrapper.getBits(), indexOfAck);} else {POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);return true;}if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", reviveQid, pointWrapper, ackMsg);}return true;} catch (Throwable e) {POP_LOGGER.error("[PopBuffer]add ack error, rqId=" + reviveQid + ", " + ackMsg, e);}return false;}

内存资源有限,一直往里堆 PopCheckPoint 也不行啊,这时候就需要做两件事:

  • 把内存里已经匹配完 ack 的 PopCheckPoint 移除掉
  • 隔太久还没 ack 掉的 PopCheckPoint,必须要落盘存储了

PopBufferMergeService 本身也是个线程,会每隔 5ms 扫描一次缓冲区,执行上述操作。
image.png
内存里的 PopCheckPoint 移除前需要满足两个条件:

  • PopCheckPoint 消息已经持久化了
  • PopCheckPoint 在内存里就已经全被 ack 掉了

还没有完全被 ack 掉的 PopCheckPoint 移除前需要做两件事:

  • PopCheckPoint 消息持久化
  • 已经被 ack 掉的消息也要持久化
private void scan() {long startTime = System.currentTimeMillis();int count = 0, countCk = 0;// 迭代缓冲区的PopCheckPointIterator<Map.Entry<String, PopCheckPointWrapper>> iterator = buffer.entrySet().iterator();while (iterator.hasNext()) {Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();PopCheckPointWrapper pointWrapper = entry.getValue();// CheckPoint已持久化,或已被ACK,从缓冲区删除// just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)|| isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper);}iterator.remove();counter.decrementAndGet();continue;}// 把超时或停留时间超10s的CheckPoint从缓冲区删除,删除前要先持久化PopCheckPoint point = pointWrapper.getCk();long now = System.currentTimeMillis();boolean removeCk = !this.serving;// ck will be timeoutif (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {removeCk = true;}// 内存停留时间超过10秒,也要移除掉if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {removeCk = true;}if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2L) {POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", pointWrapper);}// double checkif (isCkDone(pointWrapper)) {continue;} else if (pointWrapper.isJustOffset()) {// just offset should be in store.if (pointWrapper.getReviveQueueOffset() < 0) {// reviveQueueOffset<0代表还没存储,要先存储putCkToStore(pointWrapper, false);countCk++;}continue;} else if (removeCk) {// put buffer ak to storeif (pointWrapper.getReviveQueueOffset() < 0) {putCkToStore(pointWrapper, false);countCk++;}if (!pointWrapper.isCkStored()) {continue;}for (byte i = 0; i < point.getNum(); i++) {// reput buffer ak to store// 存储ack消息if (DataConverter.getBit(pointWrapper.getBits().get(), i)&& !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {if (putAckToStore(pointWrapper, i)) {count++;markBitCAS(pointWrapper.getToStoreBits(), i);}}}if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper);}iterator.remove();counter.decrementAndGet();continue;}}}// 扫描commitOffsets,提交消费位点int offsetBufferSize = scanCommitOffset();long eclipse = System.currentTimeMillis() - startTime;if (eclipse > brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, PopBufferEclipse={}, " +"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",eclipse, count, countCk, counter.get(), offsetBufferSize);this.serving = false;} else {if (scanTimes % countOfSecond1 == 0) {POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, " +"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",eclipse, count, countCk, counter.get(), offsetBufferSize);}}scanTimes++;if (scanTimes >= countOfMinute1) {counter.set(this.buffer.size());scanTimes = 0;}
}

内存里的 PopCheckPoint 除了会根据唯一键,构建一个 Map,还会根据topic@cid@queueId构建一个 QueueWithTime 队列连接起来。

ConcurrentHashMap<String/*topic@cid@queueId*/, QueueWithTime<PopCheckPointWrapper>> commitOffsets =new ConcurrentHashMap<>();

QueueWithTime 队列的用途有两个:

  • 内存里的 PopCheckPoint 处理完毕后,提交消费位点
  • 消息拉取时,获取拉取位点,避免已经投递的消息重复投递

内存里的 PopCheckPoint 只要持久化了或者全被 ack 掉了,就可以提交消费位点了。因为这些消息要么被成功消费了,要么后续在处理 CK 消息时也会被发送到重试队列里。提交消费位点的方法是commitOffset()

private boolean commitOffset(final PopCheckPointWrapper wrapper) {if (wrapper.getNextBeginOffset() < 0) {return true;}final PopCheckPoint popCheckPoint = wrapper.getCk();final String lockKey = wrapper.getLockKey();// 加锁if (!queueLockManager.tryLock(lockKey)) {return false;}try {// 旧的消费位点final long offset = brokerController.getConsumerOffsetManager().queryOffset(popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId());if (wrapper.getNextBeginOffset() > offset) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("Commit offset, {}, {}", wrapper, offset);}} else {// maybe store offset is not correct.POP_LOGGER.warn("Commit offset, consumer offset less than store, {}, {}", wrapper, offset);}// 提交消费位点brokerController.getConsumerOffsetManager().commitOffset(getServiceName(),popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId(), wrapper.getNextBeginOffset());} finally {queueLockManager.unLock(lockKey);}return true;
}

尾巴

RocketMQ 5.0 的 Pop 消费模式是 Push 模式的升级版,它解决了原先 Push 模式下队列只能由一个消费者消费的问题、去除了客户端繁重的重平衡逻辑、降低了消息堆积的风险。核心逻辑是 Broker 给每次拉取的一批消息发一个 CK 延时消息,客户端 ack 时再发一个 ACK 延时消息,消息到期后对 CK 消息做 ACK 匹配,把未被 ack 掉的消息发到重试独立里。这种模式下,不可避免的会带来额外开销,所以 RocketMQ 也支持优先在内存里完成匹配,CK 和 ACK 消息就不用存储了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/593493.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分布式(8)

目录 36.什么是TCC&#xff1f; 37.分布式系统中常用的缓存方案有哪些&#xff1f; 38.分布式系统缓存的更新模式&#xff1f; 39.分布式缓存的淘汰策略&#xff1f; 40.Java中定时任务有哪些&#xff1f;如何演化的&#xff1f; 36.什么是TCC&#xff1f; TCC&#xff08…

【算法挨揍日记】day41——【模板】01背包、416. 分割等和子集

【模板】01背包_牛客题霸_牛客网你有一个背包&#xff0c;最多能容纳的体积是V。 现在有n个物品&#xff0c;第i个物品的体积为 ,。题目来自【牛客题霸】https://www.nowcoder.com/practice/fd55637d3f24484e96dad9e992d3f62e?tpId230&tqId2032484&ru/exam/oj&qru…

HarmonyOS-ArkTS基本语法及声明式UI描述

初识ArkTS语言 ArkTS是HarmonyOS优选的主力应用开发语言。ArkTS围绕应用开发在TypeScript&#xff08;简称TS&#xff09;生态基础上做了进一步扩展&#xff0c;继承了TS的所有特性&#xff0c;是TS的超集。因此&#xff0c;在学习ArkTS语言之前&#xff0c;建议开发者具备TS语…

机器学习常用算法模型总结

文章目录 1.基础篇&#xff1a;了解机器学习1.1 什么是机器学习1.2 机器学习的场景1.2.1 模式识别1.2.2 数据挖掘1.2.3 统计学习1.2.4 自然语言处理1.2.5 计算机视觉1.2.6 语音识别 1.3 机器学习与深度学习1.4 机器学习和人工智能1.5 机器学习的数学基础特征值和特征向量的定义…

软件测试/测试开发丨Python 模块与包

python 模块与包 python 模块 项目目录结构 组成 package包module模块function方法 模块定义 定义 包含python定义和语句的文件.py文件作为脚本运行 导入模块 import 模块名from <模块名> import <方法 | 变量 | 类>from <模块名> import * 注意&a…

小红书如何高效引流?

近年来&#xff0c;公域流量价格不断上涨&#xff0c;私域流量的优势逐渐凸显。企业正花费大量资源和成本来获取新流量&#xff0c;但与其如此&#xff0c;不如将精力放在留存和复购上&#xff0c;从而实现业绩的新增长。其中关键在于如何有效地将公域流量转化为私域流量。 然而…

境内深度合成服务算法备案清单(2023年12月)

截止2024年1月3日&#xff0c;第三批深度合成服务算法备案信息的公告尚未发布&#xff0c;预计将会在2024-1-10左右发布&#xff0c;我公司已知晓部分公示名单&#xff0c;如中国电信数字人生成算法&#xff0c;详情联系WX号&#xff1a;SuanfabeiandayuAI生成合成类算法应办理…

ArkTS - @Prop、@Link

一、作用 Prop 装饰器 和Link装饰器都是父组件向子组件传递参数&#xff0c;子组件接收父组件参数的时候用的&#xff0c;变量前边需要加上Prop或者Link装饰器即可。&#xff08;跟前端vue中父组件向子组件传递参数类似&#xff09; // 子组件 Component struct SonCom {Prop…

管程-第三十三天

目录 为什么要引入管程 管程的定义和基本特征 用管程解决生产者消费者问题 结论 本节思维导图 为什么要引入管程 原因&#xff1a;在解决进程的同步与互斥问题时&#xff0c;信号量机制存在编写困难和易出错的问题 能不能设计一种机制&#xff0c;让程序员写程序时不再需…

openGauss学习笔记-184 openGauss 数据库运维-升级-升级验证

文章目录 openGauss学习笔记-184 openGauss 数据库运维-升级-升级验证184.1 验证项目的检查表184.2 升级版本查询184.2.1 验证步骤 184.3 检查升级数据库状态184.3.1 验证步骤 openGauss学习笔记-184 openGauss 数据库运维-升级-升级验证 本章介绍升级完成后的验证操作。给出验…

VINS-MONO拓展1----手写后端求解器,LM3种阻尼因子策略,DogLeg,构建Hessian矩阵

文章目录 0. 目标及思路1. 非线性优化求解器2. 基于VINS-MONO的Marginalization框架构建Hessian矩阵2.1 estimator.cpp移植2.2 solve.cpp/preMakeHessian()2.3 solve.cpp/makeHessian() 3. solve.cpp/solveLinearSystem()求解正规方程4. 更新状态5. 迭代求解6. EVO评估结果7. 待…

虹科方案丨从困境到突破:TigoLeap方案引领数据采集与优化变革

来源&#xff1a;虹科工业智能互联 虹科方案丨从困境到突破&#xff1a;TigoLeap方案引领数据采集与优化变革 原文链接&#xff1a;https://mp.weixin.qq.com/s/H3pd5G8coBvyTwASNS_CFA 欢迎关注虹科&#xff0c;为您提供最新资讯&#xff01; 导读 在数字化工厂和智能制造时…

connection refused

nohup /home/bavon/miniconda3/envs/SLFCD/bin/python -m visdom.server -port 8098 >/home/bavon/logs/visdom.log 2>&1 &

8086CPU的寻址方式(7种)

基本概念 立即操作数&#xff1a;操作数包含在指令中寄存器操作数&#xff1a;操作数包含在CPU的某个内部寄存器中存储器操作数&#xff1a;约定操作数事先存放在存储器中存放数据的某个单元基本格式 MOV xx,yy xx&#xff1a;目的操作数字段 yy&#xff1a;源操作数字段 EA&a…

whl is not a supported wheel on this platform.解决办法

1.问题&#xff1a; 安装torch产生 2.解决办法&#xff1a; 使用pip debug --verbose查看 对应的torch版本号 Compatible tags字样&#xff0c;这些就是当前Python版本可以适配的标签。例如&#xff0c;我的Python版本是3.11&#xff0c;可以匹配下面这些文件名&#xff1a;…

Nginx多域名部署多站点

目录 1.修改配置文件nginx.conf 2. 修改hosts文件 1.修改配置文件nginx.conf 在配置文件的 server_name 处修改成自己需要的域名&#xff0c;然后保存退出 j 查看语法是否错误&#xff0c;然后重启nginx nginx -t # 查看语法是否正确 systemctl restart nginx # 重启nginx …

【面试】面向对象编程的三大概念(实例辅助记忆)

【面试】面向对象编程的三大概念&#xff08;实例辅助记忆&#xff09; 虑面向对象编程的三大特性&#xff0c;它们是&#xff1a; 封装&#xff08;Encapsulation&#xff09;&#xff1a; 将对象的状态和行为封装在一起&#xff0c;对外部隐藏对象的内部实现细节。这样可以防…

Power Automate删除SharePoint Online或OneDrive for Business文件版本历史

SharePoint Online和OneDrive for Business支持版本控制&#xff0c;可以保留文件的版本历史&#xff0c;方便用户随时查看和恢复以前的版本。但该功能也会占用大量SharePoint Online或OneDrive for Business存储空间。官方删除版本历史的方法无法批量操作&#xff0c;故今天提…

音效出众设计时尚,内置AI功能,sanag塞那Z50上手

现在蓝牙耳机已经成为人们生活中不可或缺的一部分了&#xff0c;像是在上班、坐车的时候&#xff0c;既可以享受自己的音乐空间&#xff0c;又不会吵到别人&#xff0c;看书、做题还是运动的时候&#xff0c;也可以保证长时间使用耳朵卫生、舒适度。正因为庞大的市场需求&#…

高压继电器,未来几年市场将保持稳定增长

高压继电器是一种用于控制大功率电气设备的开关装置&#xff0c;广泛应用于电力系统、轨道交通、工业自动化等领域。随着各行业对电气控制需求的不断增加&#xff0c;高压继电器市场也在不断扩大。全球高压继电器市场分析&#xff1a; 在全球市场中&#xff0c;目前主要的高压继…