2.13日学习打卡----初学RocketMQ(四)

2.13日学习打卡

目录:

  • 2.13日学习打卡
  • 一.RocketMQ之Java Class
    • DefaultMQProducer类
    • DefaultMQPushConsumer类
    • Message类
    • MessageExt类
  • 二.RocketMQ 消费幂
    • 消费过程幂等
    • 消费速度慢的处理方式
  • 三.RocketMQ 集群服务
    • 集群特点
    • 单master模式
    • 多master模式
    • 多master多Slave模式-异步复制
    • 多Master多Slave模式-同步双写
  • 四.RocketMQ消息消费
    • 集群消费
    • 广播消费
    • 消息消费时的权衡

一.RocketMQ之Java Class

DefaultMQProducer类

概述
DefaultMQProducer类是应用发送消息使用的基类,封装一些通用的方法方便开发者在更多场景中使用。属于线程安全类,在配置并启动后可在多个线程间共享此对象。
其可以通过无参构造方法快速创建一个生产者,通过getter/setter方法,调整发送者的参数。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。

方法

属性内容
DefaultMQProducerImpl defaultMQProducerImpl;生产者内部默认实现类
String producerGroup;Producer组名, 默认值为DEFAULT_PRODUCER。多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。
String createTopicKey;自动创建测试的topic名称, 默认值为TBW102;在发送消息时,自动创建服务器不存在的topic,需要指定Key。broker必须开启isAutoCreateTopicEnable
int defaultTopicQueueNums;创建默认topic的queue数量。默认4
int sendMsgTimeout;发送消息超时时间,默认值10000,单位毫秒
int compressMsgBodyOverHowmuch;消息体压缩阈值,默认为4k(Consumer收到消息会自动解压缩)
int retryTimesWhenSendFailed;同步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2
int retryTimesWhenSendAsyncFailed;异步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2
boolean retryAnotherBrokerWhenNotStoreOK;声明发送失败时,下次是否投递给其他Broker,默认false
int maxMessageSize;最大消息大小。默认4M; 客户端限制的消息大小,超过报错,同时服务端也会限制
TraceDispatcher traceDispatcher消息追踪器,定义了异步传输数据接口。使用rcpHook来追踪消息

DefaultMQPushConsumer类

概述
DefaultMQPushConsumer类是rocketmq客户端消费者的实现,从名字上已经可以看出其消息获取方式为broker往消费端推送数据,其内部实现了流控,消费位置上报等等。DefaultMQPushConsumer是Push消费模式下的默认配置。

方法

字段内容
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;消费者实现类,所有的功能都委托给DefaultMQPushConsumerImpl来实现
String consumerGroup;消费者组名,必须设置,参数默认值是:DEFAULT_CONSUMER (需要注意的是,多个消费者如果具有同样的组名,那么这些消费者必须只消费同一个topic)
MessageModel messageModel;消费的方式,支持以下两种 1、集群消费 2、广播消费。BROADCASTING 广播模式,即所有的消费者可以消费同样的消息CLUSTERING 集群模式,即所有的消费者平均来消费一组消息
ConsumeFromWhere consumeFromWhere;消费者从那个位置消费,分别为:CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 ;CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费;CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费(以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始)
AllocateMessageQueueStrategy allocateMessageQueueStrategy;消息分配策略,用于集群模式下,消息平均分配给所有客户端;默认实现为AllocateMessageQueueAveragely
Map<String, String> subscription;topic对应的订阅tag
MessageListener messageListener;消息监听器 ,处理消息的业务就在监听里面。目前支持的监听模式包括:MessageListenerConcurrently,对应的处理逻辑类是MessageListener messageListener ;ConsumeMessageConcurrentlyService MessageListenerOrderly 对应的处理逻辑类是ConsumeMessageOrderlyService;两者使用不同的ACK机制。RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。上面两个不同的监听模式使用的ACK机制是不一样的。
OffsetStore offsetStore;offset存储实现,分为本地存储或远程存储 。集群消费:从远程Broker获取。广播消费:从本地文件获取。

DefaultMQPushConsumer类

重要字段int consumeThreadMin = 20		线程池自动调整int consumeThreadMax = 64		线程池自动调整long adjustThreadPoolNumsThreshold = 100000 int consumeConcurrentlyMaxSpan = 2000单队列并行消费最大跨度,用于流控 int pullThresholdForQueue = 1000一个queue最大消费的消息个数,用于流控 long pullInterval = 0			检查拉取消息的间隔时间,由于是长轮询,所以为 0,但是如果应用为了流控,也可以设置大于 0 的值,单位毫秒,取值范围: [0, 65535]consumeMessageBatchMaxSize = 1	并发消费时,一次消费消息的数量,默认为1,假如修改为50,此时若有100条消息,那么会创建两个线程,每个线程分配50条消息。换句话说,批量消费最大消息条数,取值范围: [1, 1024]。默认是1pullBatchSize = 32				消费者去broker拉取消息时,一次拉取多少条。取值范围: [1, 1024]。默认是32 。可选配置boolean postSubscriptionWhenPull = false boolean unitMode = false 重要方法subscribe(String topic, String subExpression) 订阅某个topic,subExpression传*为订阅该topic所有消息 registerMessageListener(MessageListenerConcurrently messageListener) 注册消息回调,如果需要顺序消费,需要注册MessageListenerOrderly的实现 start 							启动消息消费	

Message类

含义Producer发送的消息定义为Message类位置org.apache.rocketmq.common.message字段定义如图字段详解topicMessage都有Topic这一属性,Producer发送指定Topic的消息,Consumer订阅Topic下的消息。通过Topic字段,Producer会获取消息投递的路由信息,决定发送给哪个Broker。flag网络通信层标记。bodyProducer要发送的实际消息内容,以字节数组形式进行存储。Message消息有一定大小限制。transactionIdRocketMQ 4.3.0引入的事务消息相关的事务编号。properties该字段为一个HashMap,存储了Message其余各项参数,比如tag、key等关键的消息属性。RocketMQ预定义了一组内置属性,除了内置属性之外,还可以设置任意自定义属性。当然属性的数量也是有限的,消息序列化之后的大小不能超过预设的最大消息大小。系统内置属性定义于org.apache.rocketmq.common.message.MessageConst	(如图)对于一些关键属性,Message类提供了一组set接口来进行设置,class Message {public void setTags(String tags) {...}public void setKeys(Collection<String> keys) {...}public void setDelayTimeLevel(int level) {...}public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {...}public void setBuyerId(String buyerId) {...}}这几个set接口对应的作用分别为为,属性								接口				用途MessageConst.PROPERTY_TAGS		setTags			在消费消息时可以通过tag进行消息过滤判定MessageConst.PROPERTY_KEYS		setKeys			可以设置业务相关标识,用于消费处理判定,或消息追踪查询MessageConst.PROPERTY_DELAY_TIME_LEVEL	setDelayTimeLevel	消息延迟处理级别,不同级别对应不同延迟时间MessageConst.PROPERTY_WAIT_STORE_MSG_OK	setWaitStoreMsgOK	在同步刷盘情况下是否需要等待数据落地才认为消息发送成功`MessageConst.PROPERTY_BUYER_ID	setBuyerId		没有在代码中找到使用的地方,所以暂不明白其用处	这几个字段为什么用属性定义,而不是单独用一个字段进行表示?方便之处可能在于消息数据存盘结构早早定义,一些后期添加上的字段功能为了适应之前的存储结构,以属性形式存储在一个动态字段更为方便,自然兼容。

MessageExt类

含义对于发送方来说,上述Message的定义以足够。但对于RocketMQ的整个处理流程来说,还需要更多的字段信息用以记录一些必要内容,比如消息的id、创建时间、存储时间等等。在同package下可以找到与之相关的其余类定义。首先就是MessageExt,字段字段							用途queueId						记录MessageQueue编号,消息会被发送到Topic下的MessageQueuestoreSize					记录消息在Broker存盘大小queueOffset					记录在ConsumeQueue中的偏移sysFlag						记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识bornTimestamp				消息创建时间,在Producer发送消息时设置storeHost					记录存储该消息的Broker地址msgId						消息IdcommitLogOffset				记录在Broker中存储便宜bodyCRC						消息内容CRC校验值reconsumeTimes				消息重试消费次数preparedTransactionOffset	事务详细相关字段	注意Message还有一个名为MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX的属性,在消息发送时由Producer生成创建。上面的msgId则是消息在Broker端进行存储时通过MessageDecoder.createMessageId方法生成的,其构成为(如图)这个MsgId是在Broker生成的,Producer在发送消息时没有该信息,Consumer在消费消息时则能获取到该值。RocketMQ也提供了相关命令,命令				实现类					描述queryMsgById	QueryMsgByIdSubCommand	根据MsgId查询消息		

二.RocketMQ 消费幂

在这里插入图片描述

消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

消费速度慢的处理方式

提高消费并行度

绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:

同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。

提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。

批量方式消费

批量方式消费可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 参数值,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

跳过非重要消息

发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {//队列偏移量long offset = msgs.get(0).getQueueOffset();//最大偏移量String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);long diff = Long.parseLong(maxOffset) - offset;if (diff > 100000) {// TODO 消息堆积情况的特殊处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// TODO 正常消费过程return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} 

优化每条消息消费过程
举例如下,原来某条消息的消费过程如图中左侧的流程,优化后变成右侧
在这里插入图片描述
这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。

消费打印日志

如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());// TODO 正常消费过程return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} 

如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。

三.RocketMQ 集群服务

在这里插入图片描述

集群特点

NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。 每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有 NameServer。

Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

单master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
启动 NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动 Broker

### 启动Broker
$ nohup sh bin/mqbroker -n localhost:9876 &### 验证Broker是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/broker.log 
The broker[broker-a, 192.169.1.2:10911] boot success...

多master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
启动NameServer
NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:

 ### 首先启动Name Server$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功$ tail -f ~/logs/rocketmqlogs/namesrv.logThe Name Server boot success...

启动Broker集群

 ### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &...

多master多Slave模式-异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样

缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

启动NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
### 在机器C,启动第一个Slave,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
### 在机器D,启动第二个Slave,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

多Master多Slave模式-同步双写

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

启动NameServer

### 首先启动Name Server$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功$ tail -f ~/logs/rocketmqlogs/namesrv.logThe Name Server boot success...

启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP
为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &### 在机器B,启动第二个Master,例如NameServer的IP
为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &### 在机器C,启动第一个Slave,例如NameServer的IP
为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &### 在机器D,启动第二个Slave,例如NameServer的IP
为:192.168.1.1$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

以上Broker与Slave配对是通过指定相同的BrokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数。另外一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量

四.RocketMQ消息消费

集群消费

在这里插入图片描述
消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的1条Q。

而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。

这种模式下,消费进度(Consumer Offset)的存储会持久化到Broker。


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;/*** 集群消费消息(默认)* ClusterConsumer 类,用于从 RocketMQ 集群消费消息*/
public class ClusterConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 使用指定的消费者组名来实例化 DefaultMQPushConsumer。DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);// 指定 NameServer 地址。consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);consumer.setMessageModel(MessageModel.CLUSTERING); // 设置消息模式为集群消费模式(默认)。// 订阅一个或多个要消费的主题。consumer.subscribe("TopicTest", "*");// 注册回调函数,以便在从代理获取的消息到达时执行。consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 标记消息已成功消费。});// 启动消费者实例。consumer.start();System.out.printf("Consumer Started.%n");}
}
  • CONSUMER_GROUP 和 NAME_SERVER_ADDRESS 是从 Contant 类中获取的常量,用于指定消费者组和 NameServer 地址。
  • MessageModel.CLUSTERING 设置消费模式为集群消费模式,默认情况下 RocketMQ 使用集群消费模式。
  • consumer.subscribe(“TopicTest”, ““) 订阅了名为 “TopicTest” 的主题,”” 表示消费该主题下的所有消息。
  • consumer.registerMessageListener(…) 注册了一个消息监听器,用于接收从代理获取的消息,并处理这些消息。
  • consumer.start() 启动了消费者实例,开始消费消息。

广播消费

在这里插入图片描述
消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。

实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;/*** 广播消费消息(默认)* BroadcastingConsumer 类,用于从 RocketMQ 广播消费消息*/
public class BroadcastingConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 使用指定的消费者组名来实例化 DefaultMQPushConsumer。DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);// 指定 NameServer 地址。consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);consumer.setMessageModel(MessageModel.BROADCASTING); // 设置消息模式为广播消费模式(默认)。// 订阅一个或多个要消费的主题。consumer.subscribe("TopicTest", "*");// 注册回调函数,以便在从代理获取的消息到达时执行。consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 标记消息已成功消费。});// 启动消费者实例。consumer.start();System.out.printf("Consumer Started.%n");}
}
  • CONSUMER_GROUP 和 NAME_SERVER_ADDRESS 是从 Contant 类中获取的常量,用于指定消费者组和 NameServer 地址。
  • MessageModel.BROADCASTING 设置消费模式为广播消费模式,默认情况下 RocketMQ 使用广播消费模式。
  • consumer.subscribe(“TopicTest”, ““) 订阅了名为 “TopicTest” 的主题,”” 表示消费该主题下的所有消息。
  • consumer.registerMessageListener(…) 注册了一个消息监听器,用于接收从代理获取的消息,并处理这些消息。
  • consumer.start() 启动了消费者实例,开始消费消息。

这种模式下消费者只能收到启动后发送MQ中的消息。

消息消费时的权衡

集群模式

  • 消费端集群化部署,每条消息只需要被处理一次。
  • 由于消费进度在服务端维护,可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

广播模式

  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。

广播模式下,消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。

广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。

广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。

目前仅Java客户端支持广播模式。广播模式下服务端不维护消费进度,所以消息队列RocketMQ控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/685991.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言希尔排序详解!!!速过

目录 希尔排序是什么&#xff1f; 关于时间复杂度 希尔排序的源代码 希尔排序源代码的详解 希尔排序是什么&#xff1f; 之前我们说了三个排序&#xff08;插入排序&#xff0c;选择排序&#xff0c;冒泡排序&#xff09;有需要的铁铁可以去看看之前的讲解。 但因为之前的…

基于Python的信息加密解密网站设计与实现【源码+论文+演示视频+包运行成功】

博主介绍&#xff1a;✌csdn特邀作者、博客专家、java领域优质创作者、博客之星&#xff0c;擅长Java、微信小程序、Python、Android等技术&#xff0c;专注于Java、Python等技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; …

CSS 圆形的时钟秒针状的手柄绕中心点旋转的效果

<template><!-- 创建一个装载自定义加载动画的容器 --><view class="cloader"><!-- 定义加载动画主体部分 --><view class="clface"><!-- 定义类似秒针形状的小圆盘 --><view class="clsface"><!-…

docker (五)-docker存储-数据持久化

将数据存储在容器中&#xff0c;一旦容器被删除&#xff0c;数据也会被删除。同时也会使容器变得越来越大&#xff0c;不方便恢复和迁移。 将数据存储到容器之外&#xff0c;这样删除容器也不会丢失数据。一旦容器故障&#xff0c;我们可以重新创建一个容器&#xff0c;将数据挂…

Linux常见指令(一)

一、基本指令 1.1ls指令 语法 &#xff1a; ls [ 选项 ][ 目录或文件 ] 功能&#xff1a;对于目录&#xff0c;该命令列出该目录下的所有子目录与文件。对于文件&#xff0c;将列出文件名以及其他信息。 常用选项&#xff1a; -a 列出目录下的所有文件&#xff0c;包括以 .…

github Two-factor authentication (2FA)is required for your GitHub account

问题 github 2FA认证 详细问题 笔者使用GitKraken&#xff0c;使用github登录&#xff0c;github要去 Two-factor authentication (2FA)is required for your GitHub account&#xff0c;即进行2FA认证 解决方案 解决方案一、 微信 → \rightarrow →搜索腾讯身份验证器…

css篇---移动端适配的方案有哪几种

移动端适配 移动端适配是指同一个页面可以在不同的移动端设备上都有合理的布局。主流实现的方案有 响应式布局通过rem或者vw,vh 等实现不同设备有相同的比例而实现适配 首先需要了解viewport 【视口】 视口代表了一个可看见的多边形区域&#xff08;通常来说是矩形&#xff0…

【c++】vector的增删查改

1.先定义一个类对象vector 为了防止和库里面发生冲突&#xff0c;定义一个命名空间&#xff0c;将类对象放在命名空间 里面 #include<iostream> using namespace std; namespace zjw {class vector {public:private:}; }2.定义变量&#xff0c;需要一个迭代器&#xff…

Android---DslTabLayout实现底部导航栏

1. 在 Android 项目中引用 JitPack 库 AGP 8. 根目录的 settings.gradle dependencyResolutionManagement {...repositories {...maven { url https://jitpack.io }} } AGP 8. 根目录如果是 settings.gradle.kts 文件 dependencyResolutionManagement {...repositories {...…

单片机学习笔记---LED呼吸灯直流电机调速

目录 LED呼吸灯 直流电机调速 模型结构 波形 定时器初始化函数 中断函数 主程序 上一节讲了电机的工作原理&#xff0c;这一节开始代码演示&#xff01; 我们上一篇说Ton的时间长Toff时间短电机会快&#xff0c;Ton的时间短Toff时间长电机会慢 并且我们还要保证无论Ton和…

element-UI 组件 dialog 中 ref 获取不到元素

项目场景&#xff1a; vue3集成bpmn.js 渲染过程中&#xff0c;进行流程图查看 问题描述 dialog弹窗加载获取canvas中 加载不到&#xff0c;导致偶尔流程展示加载失败 原因分析&#xff1a; 提示&#xff1a;官方解释如下&#xff0c;主要就是获取的时候&#xff0c;组件没有…

qml之Control类型布局讲解,padding属性和Inset属性细讲

1、Control布局图 2、如何理解&#xff1f; *padding和*Inset参数如何理解呢&#xff1f; //main.qml import QtQuick 2.0 import QtQuick.Controls 2.12 import QtQuick.Layouts 1.12 import QtQuick.Controls 1.4 import QtQml 2.12ApplicationWindow {id: windowvisible: …

Leetcode - 周赛384

目录 一&#xff0c;3033. 修改矩阵 二&#xff0c;3035. 回文字符串的最大数量 三&#xff0c;3036. 匹配模式数组的子数组数目 II 一&#xff0c;3033. 修改矩阵 这道题直接暴力求解&#xff0c;先算出每一列的最大值&#xff0c;再将所有为-1的区域替换成该列的最大值&am…

算法刷题:无重复字符的最长字串

无重复字符的最长字串 .题目链接题目详情算法原理题目解析滑动窗口定义指针进窗口判断出窗口更新结果 我的答案 . 题目链接 无重复字符的最长字串 题目详情 算法原理 题目解析 首先,为了使字符串遍历的更加方便,我们选择将字符串转换为数组 题目要求子串中不能有重复的字符…

知识图谱:py2neo将csv文件导入neo4j

文章目录 安装py2neo创建节点-连线关系图导入csv文件删除重复节点并连接边 安装py2neo 安装python中的neo4j操作库&#xff1a;pip install py2neo 安装py2neo后我们可以使用其中的函数对neo4j进行操作。 图数据库Neo4j中最重要的就是结点和边&#xff08;关系&#xff09;&a…

隐函数的求导【高数笔记】

1. 什么是隐函数&#xff1f; 2. 隐函数的做题步骤&#xff1f; 3. 隐函数中的复合函数求解法&#xff0c;与求导中复合函数求解法有什么不同&#xff1f; 4. 隐函数求导的过程中需要注意什么&#xff1f;

【Linux网络编程五】Tcp套接字编程(四个版本服务器编写)

【Linux网络编程五】Tcp套接字编程(四个版本服务器编写&#xff09; [Tcp套接字编程]一.服务器端进程&#xff1a;1.创建套接字2.绑定网络信息3.设置监听状态4.获取新连接5.根据新连接进行通信 二.客户端进程&#xff1a;1.创建套接字2.连接服务器套接字3.连接成功后进行通信 三…

爱上JVM——常见问题(一):JVM组成

1 JVM组成 1.1 JVM由那些部分组成&#xff0c;运行流程是什么&#xff1f; 难易程度&#xff1a;☆☆☆ 出现频率&#xff1a;☆☆☆☆ JVM是什么 Java Virtual Machine Java程序的运行环境&#xff08;java二进制字节码的运行环境&#xff09; 好处&#xff1a; 一次编写&…

第三十三回 镇三山大闹青州道 霹雳火夜走瓦砾场-python分割字符串

黄信和刘知寨押解宋江和花荣向青州走&#xff0c;碰到了燕顺等三人来劫囚车&#xff0c;黄信逃走了&#xff0c;刘知寨被抓住&#xff0c;被花荣一刀杀了。 黄信把情况报给青州知府&#xff0c;派来了青州兵马秦统制&#xff0c;人称霹雳火的秦明。秦明与花荣打&#xff0c;花…

计算机二级之sql语言的学习(数据模型—概念模型)

概念模型 含义: 概念模型用于信息世界&#xff08;作用对象&#xff09;的建模&#xff0c;是实现现实世界到信息世界&#xff08;所以万丈高楼平地起&#xff0c;不断地学习相关的基础知识&#xff0c;保持不断地重复才能掌握最为基础的基础知识&#xff09;的概念抽象&#…