2、RocketMQ消息的分类

一、普通消息
1 消息发送分类
Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。
同步发送消息
同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。
在这里插入图片描述
异步发送消息
异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。
在这里插入图片描述
单向发送消息
单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。
在这里插入图片描述
2 代码举例
创建工程
创建一个Maven的Java工程rocketmq-test。
导入依赖
导入rocketmq的client依赖。

<properties>
<project.build.sourceEncoding>UTF-
8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>

定义同步消息发送生产者

public class SyncProducer {
public static void main(String[] args) throws Exception {
// 创建一个producer,参数为Producer Group名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定nameServer地址
producer.setNamesrvAddr("rocketmqOS:9876");
// 设置当发送失败时重试发送的次数,默认为2次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时限为5s,默认3s
producer.setSendMsgTimeout(5000);
// 开启生产者
producer.start();
// 生产并发送100条消息
for (int i = 0; i < 100; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("someTopic", "someTag", body);
// 为消息指定key
msg.setKeys("key-" + i);
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
// 关闭producer
producer.shutdown();
}
}
// 消息发送的状态
public enum SendStatus {
SEND_OK, // 发送成功
FLUSH_DISK_TIMEOUT, // 刷盘超时。当Broker设置的刷盘策略为同步刷盘时才可能出
现这种异常状态。异步刷盘不会出现
FLUSH_SLAVE_TIMEOUT, // Slave同步超时。当Broker集群设置的Master-Slave的复
制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
SLAVE_NOT_AVAILABLE, // 没有可用的Slave。当Broker集群设置为Master-Slave的
复制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
}

定义异步消息发送生产者

public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定异步发送失败后不进行重试发送
producer.setRetryTimesWhenSendAsyncFailed(0);
// 指定新创建的Topic的Queue数量为2,默认为4
producer.setDefaultTopicQueueNums(2);
producer.start();
for (int i = 0; i < 100; i++) {
byte[] body = ("Hi," + i).getBytes();
try {
Message msg = new Message("myTopicA", "myTag", body);
// 异步发送。指定回调
producer.send(msg, new SendCallback() {
// 当producer接收到MQ发送来的ACK后就会触发该回调方法的执行
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
} // end-for
// sleep一会儿
// 由于采用的是异步发送,所以若这里不sleep,
// 则消息还未发送就会将producer给关闭,报错
TimeUnit.SECONDS.sleep(3);
producer.shutdown();
}
}

定义单向消息发送生产者

public class OnewayProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.start();
for (int i = 0; i < 10; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("single", "someTag", body);
// 单向发送
producer.sendOneway(msg);
}
producer.shutdown();
System.out.println("producer shutdown");
}
}

定义消息消费者

public class SomeConsumer {
public static void main(String[] args) throws MQClientException {
// 定义一个pull消费者
// DefaultLitePullConsumer consumer = new
DefaultLitePullConsumer("cg");
// 定义一个push消费者
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("cg");
// 指定nameServer
consumer.setNamesrvAddr("rocketmqOS:9876");
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
);
// 指定消费topic与tag
consumer.subscribe("someTopic", "*");
// 指定采用“广播模式”进行消费,默认为“集群模式”
// consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册消息监听器
consumer.registerMessageListener(new
MessageListenerConcurrently() {
// 一旦broker中有了其订阅的消息就会触发该方法的执行,
// 其返回值为当前consumer消费的状态
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 逐条消费消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 开启消费者消费
consumer.start();
System.out.println("Consumer Started");
}
}

二、顺序消息
1 什么是顺序消息
顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。
默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从
多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个
Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。
2 为什么需要顺序消息
例如,现在有TOPIC ORDER_STATUS (订单状态),其下有4个Queue队列,该Topic中的不同消息用于
描述当前订单的不同状态。假设订单有状态:未支付、已支付、发货中、发货成功、发货失败。
根据以上订单状态,生产者从时序上可以生成如下几个消息:
订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中 --> 订单
T0000001:发货失败
消息发送到MQ中之后,Queue的选择如果采用轮询策略,消息在MQ的存储可能如下:

在这里插入图片描述
这种情况下,我们希望Consumer消费消息的顺序和我们发送是一致的,然而上述MQ的投递和消费方
式,我们无法保证顺序是正确的。对于顺序异常的消息,Consumer即使设置有一定的状态容错,也不
能完全处理好这么多种随机出现组合情况。

在这里插入图片描述
基于上述的情况,可以设计如下方案:对于相同订单号的消息,通过一定的策略,将其放置在一个
Queue中,然后消费者再采用一定的策略(例如,一个线程独立处理一个queue,保证处理消息的顺序
性),能够保证消费的顺序性。

3 有序性分类
根据有序范围的不同,RocketMQ可以严格地保证两种消息的有序性:分区有序与全局有序。

全局有序

在这里插入图片描述
当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序, 称为全局有序。
在创建Topic时指定Queue的数量。有三种指定方式:
1)在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量
2)在RocketMQ可视化控制台中手动创建Topic时指定Queue数量
3)使用mqadmin命令手动创建Topic时指定Queue数量

分区有序
在这里插入图片描述
如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序。
如何实现Queue的选择?在定义Producer时我们可以指定消息队列选择器,而这个选择器是我们
自己实现了MessageQueueSelector接口定义的。
在定义选择器的选择算法时,一般需要使用选择key。这个选择key可以是消息key也可以是其它
数据。但无论谁做选择key,都不能重复,都是唯一的。
一般性的选择算法是,让选择key(或其hash值)与该Topic所包含的Queue的数量取模,其结果
即为选择出的Queue的QueueId。
取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的
消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问
题如何解决?一般性的作法是,从消息中获取到选择key,对其进行判断。若是当前Consumer需
要消费的消息,则直接消费,否则,什么也不做。这种做法要求选择key要能够随着消息一起被
Consumer获取到。此时使用消息key作为选择key是比较好的做法。
以上做法会不会出现如下新的问题呢?不属于那个Consumer的消息被拉取走了,那么应该消费
该消息的Consumer是否还能再消费该消息呢?同一个Queue中的消息不可能被同一个Group中的
不同Consumer同时消费。所以,消费现一个Queue的不同选择key的消息的Consumer一定属于不
同的Group。而不同的Group中的Consumer间的消费是相互隔离的,互不影响的。

public class OrderedProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Integer orderId = i;
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("TopicA", "TagA", body);
SendResult sendResult = producer.send(msg, new
MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs,
Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.println(sendResult);
}
producer.shutdown();
}
}

三、延时消息
1 什么是延时消息
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。
采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交
易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。
在电商平台中,订单创建时会发送一条延迟消息。这条消息将会在30分钟后投递给后台业务系
统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完
成,则取消订单,将商品再次放回到库存;如果完成支付,则忽略。
在12306平台中,车票预订成功后就会发送一条延迟消息。这条消息将会在45分钟后投递给后台
业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如
果未完成,则取消预订,将车票再次放回到票池;如果完成支付,则忽略。
2 延时等级
延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在
RocketMQ服务端的MessageStoreConfig类中的如下变量中:
在这里插入图片描述
即,若指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的。
当然,如果需要自定义的延时等级,可以通过在broker加载的配置中新增如下配置(例如下面增加了1
天这个等级1d)。配置文件在RocketMQ安装目录下的conf目录中。

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

3 延时消息实现原理
在这里插入图片描述
具体实现方案是:
修改消息
在这里插入图片描述
Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相
应的consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正
常分发;若有则需要经历一个复杂的过程:
修改消息的Topic为SCHEDULE_TOPIC_XXXX
根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId
目录与consumequeue文件(如果没有这些目录与文件的话)。
延迟等级delayLevel与queueId的对应关系为queueId = delayLevel -1
需要注意,在创建queueId目录时,并不是一次性地将所有延迟等级对应的目录全部创建完毕,
而是用到哪个延迟等级创建哪个目录
在这里插入图片描述
修改消息索引单元内容。索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的
Hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原Topic后再次被写入到
commitlog中的时间。投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息
被发送到Broker时的时间戳。
将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
SCHEDULE_TOPIC_XXXX目录中各个延时等级Queue中的消息是如何排序的?
是按照消息投递时间排序的。一个Broker中同一等级的所有延时消息会被写入到consumequeue
目录中SCHEDULE_TOPIC_XXXX目录下相同Queue中。即一个Queue中消息投递时间的延迟等
级时间是相同的。那么投递时间就取决于于消息存储时间了。即按照消息被发送到Broker的时
间进行排序的。
投递延时消息
Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消
息,即按照每条消息的投递时间,将延时消息投递到⽬标Topic中。不过,在投递之前会从commitlog
中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消
息。然后再次将消息投递到目标Topic中。
ScheuleMessageService在Broker启动时,会创建并启动一个定时器TImer,用于执行相应的定时
任务。系统会根据延时等级的个数,定义相应数量的TimerTask,每个TimerTask负责一个延迟
等级消息的消费与投递。每个TimerTask都会检测相应Queue队列的第一条消息是否到期。若第
一条消息未到期,则后面的所有消息更不会到期(消息是按照投递时间排序的);若第一条消
息到期了,则将该消息投递到目标Topic,即消费该消息。
将消息重新写入commitlog
延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索
引条目,分发到相应Queue。
这其实就是一次普通消息发送。只不过这次的消息Producer是延迟消息服务类
ScheuleMessageService。

4 代码举例
定义DelayProducer类

public class DelayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.start();
for (int i = 0; i < 10; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("TopicB", "someTag", body);
// 指定消息延迟等级为3级,即延迟10s
// msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
// 输出消息被发送的时间
System.out.print(new SimpleDateFormat("mm:ss").format(new
Date()));
System.out.println(" ," + sendResult);
}
producer.shutdown();
}
}

定义OtherConsumer类

public class OtherConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("cg");
consumer.setNamesrvAddr("rocketmqOS:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
);
consumer.subscribe("TopicB", "*");
consumer.registerMessageListener(new
MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 输出消息被消费的时间
SimpleDateFormat("mm:ss").format(new Date()));
System.out.println(" ," + msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}

四、事务消息
1 问题引入
这里的一个需求场景是:工行用户A向建行用户B转账1万元。
我们可以使用同步消息来处理该需求场景:
在这里插入图片描述

  1. 工行系统发送一个给B增款1万元的同步消息M给Broker
  2. 消息被Broker成功接收后,向工行系统发送成功ACK
  3. 工行系统收到成功ACK后从用户A中扣款1万元
  4. 建行系统从Broker中获取到消息M
  5. 建行系统消费消息M,即向用户B中增加1万元
    这其中是有问题的:若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来
    说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了1万元。出
    现了数据不一致问题。

2 解决思路
解决思路是,让第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要
保证扣款成功。如果扣款失败,则回滚发送成功的消息。而该思路即使用事务消息。这里要使用分布
式事务解决方案。
在这里插入图片描述
使用事务消息来处理该需求场景:

  1. 事务管理器TM向事务协调器TC发起指令,开启全局事务
  2. 工行系统发一个给B增款1万元的事务消息M给TC
  3. TC会向Broker发送半事务消息prepareHalf,将消息M预提交到Broker。此时的建行系统是看
    不到Broker中的消息M的
  4. Broker会将预提交执行结果Report给TC。
  5. 如果预提交失败,则TC会向TM上报预提交失败的响应,全局事务结束;如果预提交成功,TC会
    调用工行系统的回调操作,去完成工行用户A的预扣款1万元的操作
  6. 工行系统会向TC发送预扣款执行结果,即本地事务的执行状态
  7. TC收到预扣款执行结果后,会将结果上报给TM。

预扣款执行结果存在三种可能性

// 描述本地事务执行状态
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
}
  1. TM会根据上报结果向TC发出不同的确认指令
    若预扣款成功(本地事务状态为COMMIT_MESSAGE),则TM向TC发送Global Commit指令
    若预扣款失败(本地事务状态为ROLLBACK_MESSAGE),则TM向TC发送Global Rollback指令
    若现未知状态(本地事务状态为UNKNOW),则会触发工行系统的本地事务状态回查操作。回
    查操作会将回查结果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上
    报给TM,TM会再向TC发送最终确认指令Global Commit或Global Rollback
  2. TC在接收到指令后会向Broker与工行系统发出确认指令
    TC接收的若是Global Commit指令,则向Broker与工行系统发送Branch Commit指令。此时
    Broker中的消息M才可被建行系统看到;此时的工行用户A中的扣款操作才真正被确认
    TC接收到的若是Global Rollback指令,则向Broker与工行系统发送Branch Rollback指令。此时
    Broker中的消息M将被撤销;工行用户A中的扣款操作将被回滚
    以上方案就是为了确保消息投递与扣款操作能够在一个事务中,要成功都成功,有一个失败,
    则全部回滚。
    以上方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案中的
    消息预提交与预扣款操作间是同步的。

3 基础
分布式事务
对于分布式事务,通俗地说就是,一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在
不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事
务一样,就是为了保证操作结果的一致性。
事务消息
RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA
是一种分布式事务解决方案,一种分布式事务处理模式。
半事务消息
暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时
该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息。
本地事务状态
Producer回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发
送来的本地事务状态来决定全局事务确认指令。

// 描述本地事务执行状态
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
}

消息回查
在这里插入图片描述
消息回查,即重新查询本地事务的执行状态。本例就是重新到DB中查看预扣款操作是否执行成功。
注意,消息回查不是重新执行回调操作。回调操作是进行预扣款操作,而消息回查则是查看预扣款操作执行的结果。
引发消息回查的原因最常见的有两个:
1)回调操作返回UNKNWON
2)TC没有接收到TM的最终全局事务确认指令

RocketMQ中的消息回查设置
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:

transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默
认为60秒
transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒。

4 XA模式三剑客
XA协议
XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。
XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩
展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标
准。
XA模式中有三个重要组件:TC、TM、RM。
TC
Transaction Coordinator,事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。
RocketMQ中Broker充当着TC。
TM
Transaction Manager,事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它
实际是全局事务的发起者。
RocketMQ中事务消息的Producer充当着TM。
RM
Resource Manager,资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事
务的状态,并驱动分支事务提交或回滚。
RocketMQ中事务消息的Producer及Broker均是RM。

5 XA模式架构
在这里插入图片描述
XA模式是一个典型的2PC,其执行原理如下:

  1. TM向TC发起指令,开启一个全局事务。
  2. 根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。
  3. 各个RM在接收到指令后会在进行本地事务预执行。
  4. RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。
  5. TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指
    令。
    若所有结果都是成功响应,则向TC发送Global Commit指令。
    只要有结果是失败响应,则向TC发送Global Rollback指令。
  6. TC在接收到指令后再次向RM发送确认指令。
    事务消息方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案
    中的消息预提交与预扣款操作间是同步的。

6 注意
事务消息不支持延时消息
对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的
情况)
7 代码举例

定义工行事务监听器

public class ICBCTransactionListener implements TransactionListener {
// 回调操作方法
// 消息预提交成功就会触发该方法的执行,用于完成本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
System.out.println("预提交消息成功:" + msg);
// 假设接收到TAGA的消息就表示扣款操作成功,TAGB的消息表示扣款失败,
// TAGC表示扣款结果不清楚,需要执行消息回查
if (StringUtils.equals("TAGA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TAGB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TAGC", msg.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
// 消息回查方法
// 引发消息回查的原因最常见的有两个:
// 1)回调操作返回UNKNWON
// 2)TC没有接收到TM的最终全局事务确认指令
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("执行消息回查" + msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
}

定义事物消息生产者

public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new
TransactionMQProducer("tpg");
producer.setNamesrvAddr("rocketmqOS:9876");
/**
* 定义一个线程池
* @param corePoolSize 线程池中核心线程数量
* @param maximumPoolSize 线程池中最多线程数
* @param keepAliveTime 这是一个时间。当线程池中线程数量大于核心线程数量
是
* 多余空闲线程的存活时长
* @param unit 时间单位
* @param workQueue 临时存放任务的队列,其参数就是队列的长度
* @param threadFactory 线程工厂
*/
ExecutorService executorService = new ThreadPoolExecutor(2, 5,
100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new
ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 为生产者指定一个线程池
producer.setExecutorService(executorService);
// 为生产者添加事务监听器
producer.setTransactionListener(new ICBCTransactionListener());
producer.start();
String[] tags = {"TAGA","TAGB","TAGC"};
for (int i = 0; i < 3; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("TTopic", tags[i], body);
// 发送事务消息
// 第二个参数用于指定在执行本地事务时要使用的业务参数
SendResult sendResult =
producer.sendMessageInTransaction(msg,null);
System.out.println("发送结果为:" +
sendResult.getSendStatus());
}
}
}

定义消费者
直接使用普通消息的SomeConsumer作为消费者即可。

public class SomeConsumer {
public static void main(String[] args) throws MQClientException {
// 定义一个pull消费者
faultLitePullConsumer("cg");
// 定义一个push消费者
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("cg");
// 指定nameServer
consumer.setNamesrvAddr("rocketmqOS:9876");
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
);
// 指定消费topic与tag
consumer.subscribe("TTopic", "*");
// 指定采用“广播模式”进行消费,默认为“集群模式”
// consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册消息监听器
consumer.registerMessageListener(new
MessageListenerConcurrently() {
// 一旦broker中有了其订阅的消息就会触发该方法的执行,
// 其返回值为当前consumer消费的状态
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 逐条消费消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 开启消费者消费
consumer.start();
System.out.println("Consumer Started");
}
}

五、批量消息
1 批量发送消息
发送限制
生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。不过需要注意以
下几点:
批量发送的消息必须具有相同的Topic
批量发送的消息必须具有相同的刷盘策略
批量发送的消息不能是延时消息与事务消息
批量发送大小
默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:
方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
方案二:在Producer端与Broker端修改属性
** Producer端需要在发送之前设置Producer的maxMessageSize属性
** Broker端需要修改其加载的配置文件中的maxMessageSize属性
生产者发送的消息大小
在这里插入图片描述
生产者通过send()方法发送的Message,并不是直接将Message序列化后发送到网络上的,而是通过这
个Message生成了一个字符串发送出去的。这个字符串由四部分构成:Topic、消息Body、消息日志
(占20字节),及用于描述消息的一堆属性key-value。这些属性中包含例如生产者地址、生产时间、
要发送的QueueId等。最终写入到Broker中消息单元中的数据都是来自于这些属性。

2 批量消费消息
修改批量属性
在这里插入图片描述
Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列
表,但默认情况下每次只能消费一条消息。若要使其一次可以消费多条消息,则可以通过修改
Consumer的consumeMessageBatchMaxSize属性来指定。不过,该值不能超过32。因为默认情况下消
费者每次可以拉取的消息最多是32条。若要修改一次拉取的最大值,则可通过修改Consumer的
pullBatchSize属性来指定。
存在的问题
Consumer的pullBatchSize属性与consumeMessageBatchMaxSize属性是否设置的越大越好?当然不
是。
pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输出现
问题的可能性就越高。若在拉取过程中若出现了问题,那么本批次所有消息都需要全部重新拉
取。
consumeMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消
费的消息具有相同的消费结果。因为consumeMessageBatchMaxSize指定的一批消息只会使用一
个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费
处理。
3 代码举例
该批量发送的需求是,不修改最大发送4M的默认值,但要防止发送的批量消息超出4M的限制。
定义消息列表分割器

// 消息列表分割器:其只会处理每条消息的大小不超4M的情况。
// 若存在某条消息,其本身大小大于4M,这个分割器无法处理,
// 其直接将这条消息构成一个子列表返回。并没有再进行分割
public class MessageListSplitter implements Iterator<List<Message>> {
// 指定极限值为4M
private final int SIZE_LIMIT = 4 *1024 * 1024;
// 存放所有要发送的消息
private final List<Message> messages;
// 要进行批量发送消息的小集合起始索引
private int currIndex;
public MessageListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
// 判断当前开始遍历的消息索引要小于消息总数
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
// 记录当前要发送的这一小批次消息列表的大小
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
// 获取当前遍历的消息
Message message = messages.get(nextIndex);
// 统计当前遍历的message的大小
int tmpSize = message.getTopic().length() +
message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry :
properties.entrySet()) {
tmpSize += entry.getKey().length() +
entry.getValue().length();
}
tmpSize = tmpSize + 20;
// 判断当前消息本身是否大于4M
if (tmpSize > SIZE_LIMIT) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
} // end-for
// 获取当前messages列表的子集合[currIndex, nextIndex)
List<Message> subList = messages.subList(currIndex, nextIndex);
// 下次遍历的开始索引
currIndex = nextIndex;
return subList;
}
}

定义批量消息生产者

public class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定要发送的消息的最大大小,默认是4M
// 不过,仅修改该属性是不行的,还需要同时修改broker加载的配置文件中的
// maxMessageSize属性
// producer.setMaxMessageSize(8 * 1024 * 1024);
producer.start();
// 定义要发送的消息集合
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("someTopic", "someTag", body);
messages.add(msg);
}
// 定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表
MessageListSplitter splitter = new
MessageListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}

定义批量消息消费者

public class BatchConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("cg");
consumer.setNamesrvAddr("rocketmqOS:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
);
consumer.subscribe("someTopicA", "*");
// 指定每次可以消费10条消息,默认为1
consumer.setConsumeMessageBatchMaxSize(10);
// 指定每次可以从Broker拉取40条消息,默认为32
consumer.setPullBatchSize(40);
consumer.registerMessageListener(new
MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 消费成功的返回结果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 消费异常时的返回结果
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/86172.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python之列表

标题 列表什么是列表列表的创建列表的删除列表的访问 列表的常用方法append()、insert()、extend()pop()、remove()、clear()count()、index()list()、 filter()、 reduce()、lambda() 列表支持的运算加法运算符乘法运算符*成员测试运算符in 内置函数对列表的操作列表推导式列表…

在Idea中调试本地Docker

报错&#xff1a; Error running myApp: Unable to open debugger port (localhost:5005): java.net.SocketException "Connection reset" 原因&#xff1a; Docker配置里边没有配置环境变量JAVA_TOOL_OPTIONS. 解决&#xff1a; 在Docker下加入运行时的环境变量JAVA…

ffmpeg抠图

1.不用png&#xff0c;用AVFrame 2.合流 3.图片抠图透明 (1.)mp4扣yuv图&#xff0c;(2)用1.把一张yuv标记为透明然后av_hwframe_transfer_data到GPU (3)用抠图算法函数对yuv进行处理 (4) qsv的h264_qsv只支持nv12和qsv&#xff0c;但qsv本身并不限制像素格式&#xff0c;比如在…

Centos7原生hadoop环境,搭建Impala集群和负载均衡配置

Centos7原生hadoop环境&#xff0c;搭建Impala集群和负载均衡配置 impala介绍 Impala集群包含一个Catalog Server (Catalogd)、一个Statestore Server (Statestored) 和若干个Impala Daemon (Impalad)。Catalogd主要负责元数据的获取和DDL的执行&#xff0c;Statestored主要负…

VSCode远程连接服务器报错:Could not establish connection to

参考&#xff1a;https://blog.csdn.net/weixin_42538848/article/details/118113262 https://www.jb51.net/article/219138.htm 刚开始把ssh文件夹中的known_hosts给删除了&#xff0c;发现没啥用。 之后在扩展Remote-SSH里面&#xff0c;把config file路径设置为ssh文件夹里…

洛谷刷题入门篇:分支结构

今天又来了&#xff0c;刷题刷题&#xff0c;我爱刷题&#xff0c;题单链接如下&#xff1a; https://www.luogu.com.cn/training/101#problems 一、【深基1-2】小学数学 N 合一 题目如下&#xff1a;https://www.luogu.com.cn/problem/P2433 题目描述 问题 1 请输出 I lov…

【03】FISCOBCOS配置及使用控制台

官网文档https://fisco-bcos-documentation.readthedocs.io/zh_CN/latest/docs/installation.html# 配置及使用控制台 第一步. 准备依赖 安装java &#xff08;推荐使用java 14&#xff09; # ubuntu系统安装java sudo apt install -y default-jdk#centos系统安装java sudo yu…

uni-app实现获取未来七天时间和星期几功能

例子如下&#xff1a; HTML&#xff1a; <viewstyle"margin-top: 3%;width: 100%;height: 10vh;display: flex;justify-content: space-around;"><div v-for"(item,index) in same_week" :class"[same_dayitem.date? activ :,dis]"cl…

关于CS 4.7 Stager 逆向及 Shellcode 重写

1. 概述 一直很想有一个自己的控&#xff0c;奈何实力不允许&#xff0c;CS 仍然是目前市面上最好用的控&#xff0c;但是也被各大厂商盯得很紧&#xff0c;通过加载器的方式进行免杀效果有限&#xff0c;后来看到有人用 go 重写了 CS 的 beacon&#xff0c;感觉这个思路很好&…

LVS-NAT模式

LVS负载均衡群集 群集的定义 Cluster&#xff0c;集群&#xff08;也称群集&#xff09;由多台主机构成&#xff0c;但对外只表现为一一个整体&#xff0c;只提供一-个访问入口(域名或IP地址)&#xff0c; 相当于一台大型计算机。 群集的作用 对于企业服务的的性能提升一般…

测试网页调用本地可执行程序(续:带参数调用)

前篇文章介绍了网页调用本地可执行程序的方式&#xff0c;通过在注册表中注册命令&#xff0c;然后在网页中调用命令启动本地程序。如果需要传递参数&#xff0c;则需要在注册表命令中的command项中设置如下形式的值。 "XXXXXX\XXXXXXX.exe" "%1"&emsp…

【python】Seaborn画热力图,只显示第一行数字---seaborn与matplotlib版本问题

github上有这个讨论&#xff1a;Heatmap only has annotation text in the top row only Issue #3478 mwaskom/seaborn (github.com)翻译过来就是&#xff1a;热图仅在最上面一行有注释文本&#xff1b; 原因就是matplotlib 在2023年9月更新到了 3.8.0版本&#xff0c;改变了…

FPGA project : HCSR04

犯下的错误&#xff1a; 1&#xff0c;由于使用cnt_base 做echo回响信号高电平时间的测量&#xff0c;它的数据应该很大&#xff0c;位宽也很大。也可以采用cnt_us计数器&#xff0c;计算这个高电平时间的。我为了精确计算距离&#xff0c;所以才仍然用的cnt_base计数器。 2&…

什么算泄露公司机密的行为(什么程度算公司泄密行为)

在当今的商业环境中&#xff0c;保护公司的核心竞争力和商业机密是至关重要的。然而&#xff0c;员工可能出于各种原因泄露这些信息&#xff0c;包括对竞争对手的追求、个人利益的驱动或者对工作的不满。在这种情况下&#xff0c;企业需要依赖专业的调查工具来揭示和证明员工的…

【Cpp】位图Bitmap

code #include <iostream> #include <vector> #include <stdio.h> #include <stdint.h>class Bitmap { private:std::vector<uint8_t> data; // 存储位图数据的字节数组uint32_t size; // 位图的大小&#xff08;以位为单位&#x…

Vue路由及Node.js环境搭建

目录 一、Vue路由 1.1 定义 1.2 应用领域 1.3 代码展示 二、Node.js 2.1 定义 2.2 特点 2.3 Node.js安装与配置 2.3.1 下载安装包 2.3.2 手动新建文件夹 2.3.3 注意事项 2.3.4 配置环境变量 2.3.5 检验是否安装配置成功 2.3.6 设置淘宝源 2.3.7 查看全局路径设置…

通用返回结果类ResultVO

1. 定义通用返回结果类 ​ 定义ResultVO类&#xff0c;作返回给前端的对象结构&#xff0c;主要有4个字段 code : 错误码 data : 内容message : 消息description : 具体描述 import lombok.Data; import java.io.Serializable;/*** 通用返回结果类* param <T>*/ Data …

史上最详细的测试用例写作规范

软件测试用例得出软件测试用例的内容&#xff0c;其次&#xff0c;按照软件测试写作方法&#xff0c;落实到文档中&#xff0c;两者是形式和内容的关系&#xff0c;好的测试用例不仅方便自己和别人查看&#xff0c;而且能帮助设计的时候考虑的更周。 一个好的测试用例必须包含…

Tessy 5.0.4

Tessy 5.0.4 Linux 2692407267qq.com&#xff0c;更多内容请见http://user.qzone.qq.com/2692407267/

论文研读-数据共享-大数据流分析中的共享执行技术

Shared Execution Techniques for Business Data Analytics over Big Data Streams 大数据流分析中的共享执行技术 1、摘要 2020年的一篇共享工作的论文&#xff1a;商业数据分析需要处理大量数据流&#xff0c;并创建物化视图以便给用户实时提供分析结果。物化每个查询&#x…