四.RocketMQ的几种消息发送方式应用

RocketMQ的几种消息发送方式应用

    • 一:普通消息
      • 1)发送同步消息
      • 2)发送异步消息
      • 3)单向发送消息
      • 4)消费消息-负载均衡模式
      • 5)消费消息-广播模式
    • 二:顺序消息
      • 1.顺序消息指的是:严格按照消息的发送顺序进行消费的消息(FIFO)。
      • 2.为什么需要顺序消息?
      • 3.有序性分类
      • 4.代码示例
    • 三:延时消息
      • 1.延时消息概览及适用场景
      • 2.延时等级
      • 3.代码示例
    • 四:批量消息
      • 1.批量发送消息
        • 1.1 发送限制
        • 1.2 生产者发送的消息大小
      • 2. 批量消费消息
        • 2.1 批量消费配置
        • 2.2 存在的问题
      • 3.代码示例
        • 3.1 定义消息列表分割器
        • 3.2 发送消息:
        • 3.3 消费消息:
        • 3.4 结果:
    • 五:过滤消息
      • 1.Tag过滤
        • 1.1 代码实现
      • 2.SQL过滤
        • 2.1 代码实现
    • 六:事务消息
      • 1.问题引入
      • 2.解决思路
      • 3.注意
      • 4.代码实现
        • 4.1生产者:
        • 4.2 消费者:
        • 4.3事务监听实现:
        • 4.4结果:

导入MQ启动依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>

yml配置:

rocketmq:name-server: 192.168.31.30:9876 # 访问地址producer:group: rocket-producer # 必须指定groupsend-message-timeout: 3000 # 消息发送超时时长,默认3sretry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

工具类

package com.lmy.config.rocketmq;import com.lmy.dto.rsp.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import com.lmy.utils.JsonUtil;/*** @author : lmy* @date : 2023/12/9 下午 12:45* 生产者*/
@Slf4j
@Component
public class MQProducerService {@Value("${rocketmq.producer.send-message-timeout}")private Integer messageTimeOut;// 建议正常规模项目统一用一个TOPICprivate static final String topic = "RLT_TEST_TOPIC";@Autowiredprivate RocketMQTemplate rocketMQTemplate;private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), newThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});/*** 普通发送(这里的参数可以随意定义,可以发送个对象,也可以是字符串等)*/public void send(String topic,String tag,Object obj) {
//        rocketMQTemplate.convertAndSend(topic + ":"+tag, obj);if (StringUtils.isNotEmpty(tag)) {tag = ":"+tag;}rocketMQTemplate.send(topic + tag, MessageBuilder.withPayload(obj).build()); // 等价于上面一行}/*** 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)* (msgBody也可以是对象,sendResult为返回的发送结果)* setHeader: 在消息发送到RocketMQ时,这个键值对会被添加到消息的头部,以便在消息接收端进行识别和处理*/public SendResult sendSyncMsg(String topic,String tag,Object msgBody) {if (StringUtils.isNotEmpty(tag)) {tag = ":"+tag;}SendResult sendResult = rocketMQTemplate.syncSend(topic+tag, MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.KEYS,"myKey").build());log.info("【sendMsg】sendResult={}", JsonUtil.objectToJson(sendResult));return sendResult;}/*** 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)* (适合对响应时间敏感的业务场景)*/public void sendAsyncMsg(String topic,String tag,String msgBody) {if (StringUtils.isNotEmpty(tag)) {tag = ":"+tag;}rocketMQTemplate.asyncSend(topic+tag, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理消息发送成功逻辑log.info("【sendAsyncMsg】sendResult={}", JsonUtil.objectToJson(sendResult));}@Overridepublic void onException(Throwable throwable) {// 处理消息发送异常逻辑log.info("【sendAsyncMsg】发送失败:sendResult={}",throwable.getMessage());}});}/*** 发送顺序消息* @param topic* @param msgBody* 该方法的hashkey参数,RocketMQ会根据这个key来决定消息发送到哪个队列,具有相同hashkey的消息会发送到同一个队列。*/public void syncSendOrderly(String topic,String tag,String msgBody,String hashKey) {if (StringUtils.isNotEmpty(tag)) {tag = ":"+tag;}SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic + tag, MessageBuilder.withPayload(msgBody).build(), hashKey);System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),msgBody));}/*** 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h*/public void sendDelayMsg(String topic,String tag,String msgBody, int delayLevel) {if (StringUtils.isNotEmpty(tag)) {tag = ":"+tag;}rocketMQTemplate.syncSend(topic+tag, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);}/*** 发送同步批量消息* @param topic* @param tag* @param messageList*/public void sendBatchMsg(String topic, String tag, List<String> messageList) {List<Message> messages = new ArrayList<>();for (String message : messageList) {messages.add(new Message(topic,tag, message.getBytes()));}DefaultMQProducer producer = rocketMQTemplate.getProducer();//把大的消息分裂成若干个小的消息ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {try {List<Message>  listItem = splitter.next();SendResult sendResult = producer.send(listItem);log.info("【sendMsg】sendResult={}", sendResult.getRawRespBody()+","+sendResult.getSendStatus());} catch (Exception e) {e.printStackTrace();//处理error}}}/*** 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)*/public void sendOneWayMsg(String topic,String tag,String msgBody) {if (StringUtils.isNotEmpty(tag)) {tag = ":"+tag;}rocketMQTemplate.sendOneWay(topic+tag, MessageBuilder.withPayload(msgBody).build());}/*** 发送带tag的消息,直接在topic后面加上":tag"*/public SendResult sendTagMsg(String topic,String tag,Object msgBody) {if (StringUtils.isNotEmpty(tag)) {tag = ":"+tag;}return rocketMQTemplate.syncSend(topic + tag, MessageBuilder.withPayload(msgBody).build());}/*** 发送sql过滤的消息"*/public SendResult sendSqlMsg(String topic, String tags, Object msgBody, Map<String,Object> propMap) {SendResult sendResult = new SendResult();try {DefaultMQProducer producer = rocketMQTemplate.getProducer();Message msg = new Message(topic, tags, msgBody.toString().getBytes());Set<Map.Entry<String, Object>> entries = propMap.entrySet();for (Map.Entry<String, Object> entry : entries) {String key = entry.getKey();Object value = entry.getValue();msg.putUserProperty(key, value + "");}sendResult = producer.send(msg);System.out.println(sendResult);}catch (Exception e) {e.printStackTrace();}return sendResult;}/*** 发送事务消息* @param topic* @param tags* @param msgBody*/public TransactionSendResult TransactionMsg(String topic, String tags, Object msgBody) {TransactionMQProducer producer = (TransactionMQProducer)rocketMQTemplate.getProducer();// 为生产者指定一个线程池producer.setExecutorService(executorService);// 设置事务监听器producer.setTransactionListener(new TransactionListenerImpl());// 设置生产者组producer.setProducerGroup("Con_Group_9");// 生成生产事务idString transactionId = UUID.randomUUID().toString().replace("-", "");// 构建消息体org.springframework.messaging.Message<Object> message = MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).build();// 第三个参数用于指定在执行本地事务时要使用的业务参数TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, message, "业务参数:" + msgBody);return transactionSendResult;}}
package com.lmy.controller.rocketmq.consumer;import com.lmy.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;/*** @author : lmy* @date : 2023/12/9 下午 12:51* 消费者** consumerGroup:是消费者的逻辑分组,用于使同一个 consumerGroup 下的消费者消费同一类消息。* consumerGroup 的作用主要有以下 2 点:*  1. 实现负载均衡。同一个 consumerGroup 下的消费者会均匀地消费同一类消息,不会重复消费。*  2. 实现消息重试。当某个消费者挂掉时,其它消费者会继续消费该消费者未消费的消息。** topic:是消息的逻辑分类,用于将消息分发到不同的消费者* topic 的作用主要有以下 2 点:* 1. 实现消息分发。当消息发送到 RocketMQ 服务器时,会根据 topic 将消息分发到不同的消费者。* 2. 实现消息过滤。消费者可以通过指定 topic 来过滤消息。** selectorType:是用于指定消息选择器的类型的属性。* selectorType 的作用:* 根据消息的属性或标签进行消息过滤,以便只有符合特定条件的消息才会被消费者消费。* MessageSelectorType.TAG:表示消息选择器的类型是基于标签进行过滤。只有标签匹配的消息才会被消费者消费。* MessageSelectorType.SQL92:它可以使用SQL92语法作为过滤规则表达式** selectorExpression:是用于指定消息选择器的表达式的属性* selectorExpression 的作用:* 根据消息的属性或标签进行消息过滤,以便只有符合特定条件的消息才会被消费者消费。* 在使用@RocketMQMessageListener 注解标注消费者类时,通过 selectorExpression 属性指定消息选择器的表达式* 例:selectorExpression = "tag1 || tag2"** consumeMode:用于指定消费者消费模式的属性* consumeMode的作用:* ConsumeMode.ORDERLY:有序消费。在这种模式下,消费者会按照消息发送的顺序来消费消息。* ConsumeMode.CONCURRENTLY:并发消费。在这种模式下,消费者可以并发地消费消息** messageModel:用于指定消息模型的属性* messageModel的作用:* MessageModel.CLUSTERING:负载均衡模式。在这种模式下,消息会被发送给其中一个订阅该主题的消费者。* MessageModel.BROADCASTING:广播模式。在这种模式下,消息会被发送给所有订阅该主题的消费者。** consumeThreadNumber:用于指定消费者线程数的属性* RocketMQ中consumeThreadNumber的默认值是1,表示使用单线程消费消息。如果消息量很大,可以通过增加 consumeThreadNumber 来提高消费性能。** maxReconsumeTimes:用于指定消息最大重试次数的属性* RocketMQ中maxReconsumeTimes 的默认值是 3,表示消息最多可以重试 3 次。如果消息在重试 3 次后仍然没有被消费,那么该消息将被丢弃。* 注意:maxReconsumeTimes 的设置会影响消费者的消费性能。如果消息量很大,建议减少 maxReconsumeTimes 的值,以提高消费性能。但是,如果消息量不大,减少 maxReconsumeTimes 的值可能会导致消息丢失。** consumeTimeout:用于指定消息消费超时时间的属性* RocketMQ中consumeTimeout 的默认值是 10000 毫秒,表示消息消费超时时间为 10 秒。如果消息在 10 秒内没有被消费,那么该消息将被重新投递。* 注意:consumeTimeout 的设置会影响消费者的消费性能。如果消息量很大,建议减少 consumeTimeout 的值,以提高消费性能。但是,如果消息量不大,减少 consumeTimeout 的值可能会导致消息丢失。** replyTimeout:用于指定消息回复超时时间的属性。* RocketMQ中replyTimeout 的默认值是 3000 毫秒,表示消息回复超时时间为 3 秒。如果消息在 3 秒内没有收到回复,那么该消息将被重新投递。* 注意:replyTimeout 的设置会影响消费者的消费性能。如果消息量很大,建议减少 replyTimeout 的值,以提高消费性能。但是,如果消息量不大,减少 replyTimeout 的值可能会导致消息丢失。** enableMsgTrace:用于指定是否开启消息跟踪的属性* enableMsgTrace的默认值是 false,表示不开启消息跟踪。如果开启消息跟踪,那么消费者可以通过消息 ID 查询消息的消费状态。* 注意:enableMsgTrace 的设置会影响消费者的消费性能。如果开启消息跟踪,那么消费者需要额外消耗资源来记录消息的消费状态。如果消息量很大,建议关闭消息跟踪。** tlsEnable:用于指定是否开启 TLS 加密的属性* tlsEnable 的默认值是 false,表示不开启 TLS 加密。如果开启 TLS 加密,那么消费者和生产者之间的通信将使用 TLS 协议进行加密。* 注意:tlsEnable 的设置会影响消费者的消费性能。如果开启 TLS 加密,那么消费者需要额外消耗资源来进行 TLS 加密和解密。如果消息量很大,建议关闭 TLS 加密。** namespace:用于指定消息的命名空间的属性* namespace 的默认值是 default,表示使用默认的命名空间。如果需要使用自定义的命名空间,可以通过 namespace 属性指定。* 注意:namespace 的设置会影响消息的消费路由。如果 namespace 设置不正确,那么消息可能会被错误地消费。** delayLevelWhenNextConsume:用于指定消息下次消费的延迟级别的属性* delayLevelWhenNextConsume 的默认值是 0,表示消息下次消费的延迟级别为 0 级。如果需要设置消息下次消费的延迟级别,可以通过 delayLevelWhenNextConsume 属性指定* 注意:delayLevelWhenNextConsume 的设置会影响消息的消费时间。如果 delayLevelWhenNextConsume 设置过高,那么消息可能会被延迟很长时间才能被消费。** suspendCurrentQueueTimeMillis:用于指定暂停当前队列的毫秒数的属性* suspendCurrentQueueTimeMillis 的默认值是 -1,表示不暂停当前队列。如果需要暂停当前队列,可以通过 suspendCurrentQueueTimeMillis 属性指定暂停的时间。* 注意:suspendCurrentQueueTimeMillis 的设置会影响消息的消费时间。如果 suspendCurrentQueueTimeMillis 设置过高,那么消息可能会被延迟很长时间才能被消费。** awaitTerminationMillisWhenShutdown:用于指定消费者在关闭时等待终止的时间的属性。* awaitTerminationMillisWhenShutdown 的默认值是 0,表示消费者在关闭时不等待终止。如果需要消费者在关闭时等待一段时间后终止,可以通过 awaitTerminationMillisWhenShutdown 属性指定等待的时间。* 注意:awaitTerminationMillisWhenShutdown 的设置会影响消费者的关闭时间。如果设置的等待时间过长,消费者可能需要等待较长时间才能完全关闭。** instanceName:用于指定消费者实例名称的属性* instanceName 的作用是为了在一个进程中创建多个消费者实例,每个实例可以独立运行和管理。* 这对于需要同时消费多个主题或者在不同的消费者组中使用相同的消费者类来处理消息时非常有用。**/
@Slf4j
@Component
public class MQConsumerService {/**** topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意* selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息* messageModel可设置消费者模式*   1.CLUSTERING:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同*   2.BROADCASTING:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的*/@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_One")public class ConsumerSend implements RocketMQListener<Object> {// 监听到消息就会执行此方法@Overridepublic void onMessage(Object obj) {log.info("监听到消息:Object={}", JsonUtil.objectToJson(obj));}}// 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,// 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行
//    @Service
//    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two")
//    public class ConsumerSend2 implements RocketMQListener<String> {
//        @Override
//        public void onMessage(String str) {
//            log.info("监听到消息:str={}", str);
//        }
//    }// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2",messageModel = MessageModel.CLUSTERING, consumerGroup = "Con_Group_Three")public class Consumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到消息:msg={},Key:{}", msg,messageExt.getKeys());}}@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag3",messageModel = MessageModel.CLUSTERING,consumeMode= ConsumeMode.ORDERLY, consumerGroup = "Con_Group_Four")public class orderConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到顺序消息:msg={},Key:{}", msg,messageExt.getKeys());}}@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag4",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_Five")public class delayMsgConsumer implements RocketMQListener<Object> {// 监听到消息就会执行此方法@Overridepublic void onMessage(Object obj) {log.info("监听到延迟消息:Object={},消费时间={}", JsonUtil.objectToJson(obj),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}}@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5", consumerGroup = "Con_Group_Sex")public class batchConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到批量消息:msg={},Key:{}", msg,messageExt.getKeys());}}@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5 || tag6", consumerGroup = "Con_Group_7")public class tagConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到过滤消息:msg={},Key:{}", msg,messageExt.getKeys());}}@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC",selectorType = SelectorType.SQL92, selectorExpression = "age between 0 and 6", consumerGroup = "Con_Group_8")public class SqlConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到sql过滤消息:msg={},Key:{}", msg,messageExt.getKeys());}}@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag8 || tag9 || tag10", consumerGroup = "Con_Group_9")public class TransactionConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到事务消息:msg={},Key:{}", msg,messageExt.getKeys());}}}

消息发送者步骤:

1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题TopicTag和消息体
5.发送消息
6.关闭生产者producer

消息消费者步骤:

1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题TopicTag
4.设置回调函数,处理消息
5.启动消费者consumer

一:普通消息

1)发送同步消息

同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
在这里插入图片描述

@ApiOperation("发送同步消息")
@GetMapping("/syncMessageProducer")
public void syncMessageProducer() {String msg = "同步消息体";for (int i = 0; i < 100; i++) {SendResult sendResult = mqProducerService.sendSyncMsg("RLT_TEST_TOPIC","tag1",msg + i);System.out.printf("%s%n", sendResult);}
}

2)发送异步消息

异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
在这里插入图片描述

@ApiOperation("发送异步消息")
@GetMapping("/sendAsyncMsg")
public void sendAsyncMsg() {String msg = "异步消息体";for (int i = 0; i < 100; i++) {mqProducerService.sendAsyncMsg("RLT_TEST_TOPIC","tag2",msg + i);}
}

3)单向发送消息

单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
在这里插入图片描述

@ApiOperation("发送单向消息")
@GetMapping("/sendOneWayMsg")
public void sendOneWayMsg() {String msg = "单向消息体";for (int i = 0; i < 100; i++) {mqProducerService.sendOneWayMsg("RLT_TEST_TOPIC","tag2",msg + i);}
}

4)消费消息-负载均衡模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同

生产消息:

@ApiOperation("发送同步消息")
@GetMapping("/syncMessageProducer")
public void syncMessageProducer() {String msg = "同步消息体";for (int i = 0; i < 100; i++) {SendResult sendResult = mqProducerService.sendSyncMsg("RLT_TEST_TOPIC","tag1",msg + i);System.out.printf("%s%n", sendResult);}
}

消费消息:

package com.lmy.config.rocketmq;import com.lmy.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;/*** @author : lmy* @date : 2023/12/9 下午 12:51* 消费者*/
@Slf4j
@Component
public class MQConsumerService {/*** topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意* selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息* messageModel可设置消费者模式*   1.CLUSTERING:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同*   2.BROADCASTING:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的*/@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_One")public class ConsumerSend implements RocketMQListener<Object> {// 监听到消息就会执行此方法@Overridepublic void onMessage(Object obj) {log.info("监听到消息:Object={}", JsonUtil.objectToJson(obj));}}// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.CLUSTERING, consumerGroup = "Con_Group_One")public class Consumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到消息:msg={},Key:{}", msg,messageExt.getKeys());}}
}

响应结果:
在这里插入图片描述

5)消费消息-广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

生产消息:

@ApiOperation("发送同步消息")
@GetMapping("/syncMessageProducer")
public void syncMessageProducer() {String msg = "同步消息体";for (int i = 0; i < 100; i++) {SendResult sendResult = mqProducerService.sendSyncMsg("RLT_TEST_TOPIC","tag1",msg + i);System.out.printf("%s%n", sendResult);}
}

消费消息:

package com.lmy.config.rocketmq;import com.lmy.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;/*** @author : lmy* @date : 2023/12/9 下午 12:51* 消费者*/
@Slf4j
@Component
public class MQConsumerService {/*** topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意* selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息* messageModel可设置消费者模式*   1.CLUSTERING:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同*   2.BROADCASTING:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的*/@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.BROADCASTING,consumerGroup = "Con_Group_One")public class ConsumerSend implements RocketMQListener<Object> {// 监听到消息就会执行此方法@Overridepublic void onMessage(Object obj) {log.info("监听到消息:Object={}", JsonUtil.objectToJson(obj));}}// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.BROADCASTING, consumerGroup = "Con_Group_Three")public class Consumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到消息:msg={},Key:{}", msg,messageExt.getKeys());}}
}

响应结果:
在这里插入图片描述

二:顺序消息

1.顺序消息指的是:严格按照消息的发送顺序进行消费的消息(FIFO)。

默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。

2.为什么需要顺序消息?

例如,现在有TOPIC ORDER_STATUS (订单状态),其下有4个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态:未支付、已支付、发货中、发货成功、发货失败。

根据以上订单状态,生产者从时序上可以生成如下几个消息:
订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中 --> 订单T0000001:发货失败

消息发送到MQ中之后,Queue的选择如果采用轮询策略,消息在MQ的存储可能如下:
在这里插入图片描述
这种情况下,我们希望Consumer消费消息的顺序和我们发送是一致的,然而上述MQ的投递和消费方式,我们无法保证顺序是正确的。对于顺序异常的消息,Consumer即使设置有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。

在这里插入图片描述
基于上述的情况,可以设计如下方案:对于相同订单号的消息,通过一定的策略,将其放置在一个Queue中,然后消费者再采用一定的策略(例如,一个线程独立处理一个queue,保证处理消息的顺序性),能够保证消费的顺序性。

3.有序性分类

根据有序范围的不同,RocketMQ可以严格地保证两种消息的有序性:分区有序全局有序
1)全局有序
当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序, 称为全局有序。

在创建Topic时指定Queue的数量。有三种指定方式:

  • 在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量
  • 在RocketMQ可视化控制台中手动创建Topic时指定Queue数量
  • 使用mqadmin命令手动创建Topic时指定Queue数量

在这里插入图片描述
2)分区有序
如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序。

如何实现Queue的选择:
一般我们给出唯一且不重复的key(例如订单号),让key(或其hash值)与该Topic所包含的Queue的数量取模,其结果即为选择出的Queue的QueueId。以下是源码:
在这里插入图片描述

取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问题如何解决?一般性的作法是,从消息中获取到选择key,对其进行判断。

但是不属于那个Consumer的消息被拉取走了,那么应该消费该消息的Consumer是否还能再消费该消息呢?同一个Queue中的消息不可能被同一个Group中的不同Consumer同时消费。所以,消费现一个Queue的不同选择key的消息的Consumer一定属于不同的Group。而不同的Group中的Consumer间的消费是相互隔离的,互不影响的
在这里插入图片描述

4.代码示例

/*** 订单的步骤*/private static class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}}
/*** 生成模拟订单数据*/
private List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("未支付");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("未支付");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("未支付");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("已支付");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("已支付");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("已支付");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("发货中");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("发货中");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("发货中");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("发货失败");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("发货成功");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("发货成功");orderList.add(orderDemo);return orderList;
}

生产消息:

@ApiOperation("发送顺序消息")
@GetMapping("/syncSendOrderly")
public void syncSendOrderly() {String msg = "顺序消息体";for (int i = 0; i < 12; i++) {// 订单列表List<OrderStep> orderList = new RocketMQController().buildOrders();mqProducerService.syncSendOrderly("RLT_TEST_TOPIC","tag3",msg +orderList.get(i).toString() ,orderList.get(i).getOrderId()+"");}
}

消息消费:

@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag3",messageModel = MessageModel.CLUSTERING,consumeMode= ConsumeMode.ORDERLY, consumerGroup = "Con_Group_Four")
public class orderConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到顺序消息:msg={},Key:{}", msg,messageExt.getKeys());}
}

响应结果:
在这里插入图片描述

三:延时消息

1.延时消息概览及适用场景

当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。

采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,提交一个订单时可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

2.延时等级

延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。
延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中:

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

即,若指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的。

如果需要自定义的延时等级,可以通过在broker加载的配置中新增如下配置(例如下面增加了1天这个等级1d)。配置文件在RocketMQ安装目录下的conf目录中。

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m
1h 2h 1d

3.代码示例

延迟消息生产者:

@ApiOperation("发送延时消息")
@GetMapping("/sendDelayMsg")
public void sendDelayMsg() {String msg = "延时消息体发出时间";for (int i = 0; i < 12; i++) {String msgBody = msg + i + ":" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());mqProducerService.sendDelayMsg("RLT_TEST_TOPIC","tag4",msgBody ,3);}
}

延迟消息消费者:

@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag4",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_Five")
public class delayMsgConsumer implements RocketMQListener<Object> {// 监听到消息就会执行此方法@Overridepublic void onMessage(Object obj) {log.info("监听到延迟消息:Object={},消费时间={}", JsonUtil.objectToJson(obj),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}
}

结果:
在这里插入图片描述

四:批量消息

生产者批量发送消息能显著提高传递小消息的性能。

1.批量发送消息

1.1 发送限制
  • 批量发送的消息必须具有相同的Topic
  • 批量发送的消息必须具有相同的刷盘策略
  • 批量发送的消息不能是延时消息与事务消息
  • 默认情况下,一批发送的消息总大小不能超过4MB字节,若想调整大小方法如下
    1)将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
    2)在Producer端与Broker端修改属性
    Producer端需要在发送之前设置Producer的maxMessageSize属性
    Broker端需要修改其加载的配置文件中的maxMessageSize属性
1.2 生产者发送的消息大小

生产者通过send()方法发送的Message,并不是直接将Message序列化后发送到网络上的,而是通过这个Message生成了一个字符串发送出去的。这个字符串由四部分构成:Topic、消息Body、消息日志(占20字节),及用于描述消息的一堆属性key-value。
在这里插入图片描述

2. 批量消费消息

2.1 批量消费配置

Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。若要使其一次可以消费多条消息,则可以通过修改Consumer的consumeMessageBatchMaxSize属性来指定。但是,该值不能超过32。因为默认情况下消费者每次可以拉取的消息最多是32条。若要修改一次拉取的最大值,则可通过修改Consumer的pullBatchSize属性来指定。
在这里插入图片描述

2.2 存在的问题
  • pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输出现问题的可能性就越高。若在拉取过程中若出现了问题,那么本批次所有消息都需要全部重新拉取。
  • consumeMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消费的消息具有相同的消费结果。因为consumeMessageBatchMaxSize指定的一批消息只会使用一个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费处理。

3.代码示例

3.1 定义消息列表分割器
package com.lmy.utils.rocketMq;import org.apache.rocketmq.common.message.Message;import java.util.Iterator;
import java.util.List;
import java.util.Map;/*** @author : lmy* @date : 2023/12/24 上午 11:58* 定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表* 消息列表分割器:其只会处理每条消息的大小不超4M的情况。* 若存在某条消息,其本身大小大于4M,这个分割器无法处理,* 其直接将这条消息构成一个子列表返回。并没有再进行分割*/
public class ListSplitter implements Iterator<List<Message>> {// 指定极限值为4Mprivate int SIZE_LIMIT = 1024 * 1024 * 4;// 存放所有要发送的消息private final List<Message> messages;// 要进行批量发送消息的小集合起始索引private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Overridepublic boolean hasNext() {// 判断当前开始遍历的消息索引要小于消息总数return currIndex < messages.size();}@Overridepublic List<Message> next() {int nextIndex = currIndex;// 记录当前要发送的这一小批次消息列表的大小int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {// 获取当前遍历的消息Message message = messages.get(nextIndex);// 统计当前遍历的message的大小int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; // 增加日志的开销20字节// 判断当前消息本身是否大于4Mif (tmpSize > SIZE_LIMIT) {//单个消息超过了最大的限制//忽略,否则会阻塞分裂的进程if (nextIndex - currIndex == 0) {//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环nextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}// 获取当前messages列表的子集合[currIndex, nextIndex)List<Message> subList = messages.subList(currIndex, nextIndex);// 下次遍历的开始索引currIndex = nextIndex;return subList;}}
3.2 发送消息:
@ApiOperation("发送批量消息")
@GetMapping("/sendBatchMsg")
public void sendBatchMsg() {String msg = "批量消息体发出时间";List<String> list = new ArrayList<>();for (int i = 0; i < 100; i++) {String msgBody = msg + i + ":" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());list.add(msgBody);}mqProducerService.sendBatchMsg("RLT_TEST_TOPIC","tag5",list);
}
3.3 消费消息:
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5", consumerGroup = "Con_Group_Sex")
public class batchConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到批量消息:msg={},Key:{}", msg,messageExt.getKeys());}
}
3.4 结果:

可见,100条信息一次批量发送
在这里插入图片描述

五:过滤消息

消费者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。

1.Tag过滤

以一个标签标识信息进行过滤

1.1 代码实现

生产者:

@ApiOperation("发送过滤消息")
@GetMapping("/sendTagMsg")
public void sendTagMsg() {String msg = "过滤消息体";for (int i = 0; i < 100; i++) {mqProducerService.sendTagMsg("RLT_TEST_TOPIC","tag6",msg + i);}
}

消费者:

@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5 || tag6", consumerGroup = "Con_Group_7")
public class tagConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到过滤消息:msg={},Key:{}", msg,messageExt.getKeys());}
}

结果:
在这里插入图片描述

2.SQL过滤

相对于tag(一个消息只能有一个标签)不能应对复杂的场景,可以使用SQL表达式过滤消息。
SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。但是,只有使用PUSH模式的消费者才能使用SQL过滤。

SQL过滤表达式中支持多种常量类型与运算符:
支持的常量类型:

  • 数值:比如:123,3.1415
  • 字符:必须用单引号包裹起来,比如:‘abc’
  • 布尔:TRUE 或 FALSE
  • NULL:特殊的常量,表示空

支持的运算符有:

  • 数值比较:>,>=,<,<=,BETWEEN,=
  • 字符比较:=,<>,IN
  • 逻辑运算 :AND,OR,NOT
  • NULL判断:IS NULL 或者 IS NOT NULL

默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:

#默认情况下Broker没有开启消息的SQL过滤功能,以下设置开启
enablePropertyFilter=true

然后重启生效即可!
在这里插入图片描述

2.1 代码实现

定义SQL过滤生产者:

/*** 发送sql过滤的消息"*/
public SendResult sendSqlMsg(String topic, String tags, Object msgBody, Map<String,Object> propMap) {SendResult sendResult = new SendResult();try {DefaultMQProducer producer = rocketMQTemplate.getProducer();Message msg = new Message(topic, tags, msgBody.toString().getBytes());Set<Map.Entry<String, Object>> entries = propMap.entrySet();for (Map.Entry<String, Object> entry : entries) {String key = entry.getKey();Object value = entry.getValue();msg.putUserProperty(key, value + "");}sendResult = producer.send(msg);System.out.println(sendResult);}catch (Exception e) {e.printStackTrace();}return sendResult;
}
@ApiOperation("发送sql过滤消息")
@GetMapping("/sendSqlMsg")
public void sendSqlMsg() {String msg = "sql过滤消息体";for (int i = 0; i < 100; i++) {Map<String, Object> pop = new HashMap<>();pop.put("age",i);mqProducerService.sendSqlMsg("RLT_TEST_TOPIC","tag7",msg + i,pop);}
}

消费者:

@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC",selectorType = SelectorType.SQL92, selectorExpression = "age between 0 and 6", consumerGroup = "Con_Group_8")
public class SqlConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到sql过滤消息:msg={},Key:{}", msg,messageExt.getKeys());}
}

结果:
在这里插入图片描述

六:事务消息

1.问题引入

需求场景:工行用户A向建行用户B转账1万元
可以使用同步消息来处理该需求场景:
在这里插入图片描述
存在的问题:
若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了1万元。出现了数据不一致问题。

2.解决思路

让第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息。而该思路即使用事务消息。这里要使用分布式事务解决方案。
在这里插入图片描述
预扣款执行结果存在三种可能性:

// 描述本地事务执行状态
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
}

消息回查,即重新查询本地事务的执行状态。

关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:

  • transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒
  • transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
  • transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒。

rocketMQ中使用的分布式事务解决方案是XA处理模式
XA模式中有三个重要组件:TC、TM、RM。
TC:事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。(RocketMQ中Broker充当着TC。)
TM:事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。(RocketMQ中事务消息的Producer充当着TM)
RM:资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚(RocketMQ中事务消息的Producer及Broker均是RM)

3.注意

  • 事务消息不支持延时消息和批量消息。
  • 对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的情况)
  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

4.代码实现

4.1生产者:
@ApiOperation("发送事务消息")
@GetMapping("/sendTransactionMsg")
public void TransactionMsg() {for (int i = 8; i < 11; i++) {mqProducerService.TransactionMsg("RLT_TEST_TOPIC", "tag" + i, "事务消息" + i);}}
/*** 发送事务消息* @param topic* @param tags* @param msgBody*/
public TransactionSendResult TransactionMsg(String topic, String tags, Object msgBody) {TransactionMQProducer producer = (TransactionMQProducer)rocketMQTemplate.getProducer();// 为生产者指定一个线程池producer.setExecutorService(executorService);// 设置事务监听器producer.setTransactionListener(new TransactionListenerImpl());// 设置生产者组producer.setProducerGroup("Con_Group_9");// 生成生产事务idString transactionId = UUID.randomUUID().toString().replace("-", "");// 构建消息体org.springframework.messaging.Message<Object> message = MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).build();// 第三个参数用于指定在执行本地事务时要使用的业务参数TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, message, "业务参数:" + msgBody);return transactionSendResult;
}
4.2 消费者:
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag8 || tag9 || tag10", consumerGroup = "Con_Group_9")
public class TransactionConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到事务消息:msg={},Key:{}", msg,messageExt.getKeys());}
}
4.3事务监听实现:
package com.lmy.config.rocketmq;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;/*** @author : liu ming yong* @date : 2024/4/21 下午 6:30* @description : 事务监听器*/
@Slf4j
public class TransactionListenerImpl implements TransactionListener {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 回调操作方法,消息预提交成功就会触发该方法的执行,用于完成本地事务log.info("预提交消息成功:" + msg+";业务参数:"+arg);// 假设接收到tag8的消息就表示操作成功,tag9的消息表示操作失败,tag10表示操作结果不清楚,需要执行消息回查if (StringUtils.equals("tag8", msg.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else if (StringUtils.equals("tag9", msg.getTags())) {return LocalTransactionState.ROLLBACK_MESSAGE;} else if (StringUtils.equals("tag10", msg.getTags())) {return LocalTransactionState.UNKNOW;}return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 消息回查方法// 引发消息回查的原因最常见的有两个:1)回调操作返回UNKNWON  2)TC没有接收到TM的最终全局事务确认指令log.info("执行消息回查" + msg.getTags());return LocalTransactionState.COMMIT_MESSAGE;}
}
4.4结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/2859.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

服务器数据恢复—ESXi无法识别数据存储和VMFS文件系统如何恢复数据?

服务器数据恢复环境&#xff1a; 一台某品牌服务器&#xff0c;通过FreeNAS来做iSCSI&#xff0c;然后使用两台同品牌服务器做ESXi虚拟化系统。 FreeNAS层为UFS2文件系统&#xff0c;使用整个存储建一个稀疏模式的文件&#xff0c;挂载到ESXi虚拟化系统。ESXi虚拟化系统中有3台…

react实现时钟翻牌效果

需求&#xff1a;随着数字的变动要求有时钟翻动动效 问题&#xff1a;只在加载时有动效 解决方案&#xff1a;通过判断数字改变&#xff08;这里通过新旧数值变动来判断&#xff0c;不贴代码啦&#xff09;&#xff0c;每次变动的时候手动把animationIterationCount设置为inf…

【blog项目】layui与jquery冲突导致鼠标悬停事件失效、如何调用layui.use()作用域里的方法

blog项目前台展示——查询数据库中的文章类型并展示时出现的bug 1 正常演示 2 用jquery查询数据库并添加到页面后 3 相关代码 <script src"/static/jquery-2.1.4.js"></script> <script src"/static/layui/layui.js"></script> …

分布式与一致性协议之CAP(一)

CAP理论 概述。 在开发分布式系统的时候&#xff0c;会遇到一个非常棘手的问题&#xff0c;那就是如何根据业务特点&#xff0c;为系统设计合适的分区容错一致性模型&#xff0c;以实现集群能力。这个问题棘手在当发生分区错误时&#xff0c;应该如何保障系统稳定运行而不影响…

基于STM32和阿里云的智能台灯(STM32+ESP8266+MQTT+阿里云+语音模块)

一、主要完成功能 1、冷光模式和暖光模式两种灯光 主要支持冷光和暖光模式两种&#xff0c;可以通过语音模块或手机app远程切换冷暖光 2、自动模式和手动模式 主要支持手动模式和自动两种模式&#xff08;app或语音助手切换&#xff09; (1)自动模式&#xff1a;根据环境光照…

第七天 dfs剪枝优化

第七天 dfs剪枝&优化 1可行性剪枝 2最优性剪枝 3重复性剪枝 题 1 输入 5 5 6 …S. XX.X. …X… …D.X …X… 输出 YES —————————————— 题解 #include<iostream> #include<cstdio> using namespace std; const int N 10; int n,m,T; char …

绿色便携方式安装apache+mysql+tomcat+php集成环境并提供控制面板

绿色便携方式安装带控制面板的ApacheMariaDBTomcatPHP集成环境 目录 绿色便携方式安装带控制面板的ApacheMariaDBTomcatPHP集成环境[TOC](目录) 前言一、XAMPP二、安装和使用1.安装2.使用 三、可能的错误1、检查端口占用2、修改端口 前言 安装集成环境往往配置复杂&#xff0c…

自动化立体库安全使用管理制度

导语 大家好&#xff0c;我是智能仓储物流技术研习社的社长&#xff0c;老K。专注分享智能仓储物流技术、智能制造等内容。 新书《智能物流系统构成与技术实践》 完整版文件和更多学习资料&#xff0c;请球友到知识星球 【智能仓储物流技术研习社】自行下载 关于自动化立体库安…

四、Flask进阶

Flask-Cache pip install flask-caching安装flask_cache初始化 # ext.py from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate from flask_caching import Cachedb SQLAlchemy() migrate Migrate() cache Cache(config{CACHE_TYPE: simple # 缓存…

MybatisPlus(简单CURD,MP的实体类注解,MP条件查询,MP分页查询,MP批量操作,乐观锁,代码生成器)

目录 一、MP入门 1. MP是什么 2. MP使用入门 1 说明 2 准备MP项目环境 1) 添加依赖 2) 创建配置文件 3) 创建引导类 3 MP使用入门 1 创建实体类 2 创建Mapper 3 使用测试 3. 小结 二、MP简单CURD【重点】 1. 说明 2. 示例 3. 小结 三、MP的实体类注解[重点] …

字符串漏洞注入深入学习

字符串型漏洞注入&#xff0c;特别是针对Web应用程序的SQL注入&#xff0c;是一种常见的网络安全威胁。它涉及攻击者在不受控制的情况下&#xff0c;通过构造特定的字符串输入&#xff0c;干扰或改变应用程序中原有的SQL查询语句&#xff0c;从而执行恶意的SQL代码。 要深入学…

微软Phi-3,3.8亿参数能与Mixtral 8x7B和GPT-3.5相媲美,量化后还可直接在IPhone中运行

Phi-3系列 Phi-3是一系列先进的语言模型&#xff0c;专注于在保持足够紧凑以便在移动设备上部署的同时&#xff0c;实现高性能。Phi-3系列包括不同大小的模型&#xff1a; Phi-3-mini&#xff08;38亿参数&#xff09; - 该模型在3.3万亿个令牌上进行训练&#xff0c;设计得足…

【Stable Diffusion系列】(一):AI绘画本地部署教程

目录 一、总览 二、本地部署 1、安装cuda 2、安装python 3、安装git 4、方法一 1&#xff09;获取安装包 2&#xff09;update 3&#xff09;run 5、方法二 1&#xff09;git clone 2&#xff09;双击webui-user.bat 3&#xff09;更新 6、设置启动参数 7、…

指针(5)

前言 本节是有关指针内容的最后一节&#xff0c;本节的内容以讲解指针习题为主&#xff0c;那么就让我们一起来开启本节的学习吧&#xff01; sizeof和strlen的对比 1.sizeof 我们在学习操作符的时候&#xff0c;学习了sizeof。sizeof存在的意义是用来计算变量所占用的内存空…

AI大模型日报#0424:全球首个AI基因编辑器、出门问问上市、微软开源Phi-3 Mini、昆仑万维年收49亿

导读&#xff1a; 欢迎阅读《AI大模型日报》&#xff0c;内容基于Python爬虫和LLM自动生成。目前采用“文心一言”生成了每条资讯的摘要。标题: 爱诗科技完成A2轮超亿元融资&#xff0c;蚂蚁集团领投摘要: 爱诗科技完成A2轮超亿元融资&#xff0c;成为视频大模型领域融资规模最…

STM32学习和实践笔记(20):定时器

1.定时器介绍 STM32F1的定时器一共有8个&#xff0c;由2个基本定时器&#xff08;TIM6、TIM7&#xff09;、4个通用定时器&#xff08;TIM2-TIM5&#xff09;和2个高级定时器&#xff08;TIM1、TIM8&#xff09;组成。 基本定时器的功能最为简单&#xff0c;类似于51单片机内定…

【行为型模式】中介者模式

一、中介者模式概述 中介者模式定义&#xff1a;用一个中介对象来封装一系列的对象交互&#xff0c;中介者使各对象不需要显式地相互引用&#xff0c;从而使其耦合松散&#xff0c;而且可以独立地改变它们之间的交互。中介者模式又称为调停者模式。(对象行为型模式) 中介者模式…

python+django校园社交高校交友网站2x7r5.

本课题使用Python语言进行开发。代码层面的操作主要在PyCharm中进行&#xff0c;将系统所使用到的表以及数据存储到MySQL数据库中&#xff0c;方便对数据进行操作本课题基于WEB的开发平台&#xff0c;设计的基本思路是&#xff1a; 前端&#xff1a;vue.jselementui 框架&#…

Node.JS安装及配置教程(Windows)【安装】

文章目录 一、 Node.JS 下载1. 官网下载&#xff08;1&#xff09;国内地址&#xff08;2&#xff09;国外地址 2. 其它渠道 二、 Node.JS 安装三、 Node.JS验证四、 Node.JS 配置&#xff08;可选&#xff09;1. 配置全局模块安装路径方法一方法二2. 配置国内镜像 五、 yarn 安…

企业数智化:为什么选择梧桐数据库?

个人介绍&#xff1a;艺名司镜233&#xff0c;是中国移动梧桐数据库研发团队成员&#xff0c;从事相关的技术开发近5年了。最让我觉得自豪的不是在研发这款数据库&#xff0c;而是我们用代码&#xff0c;切实地帮助企业解决数据的困扰&#xff0c;切实地解决社会的问题。 本篇文…