RocketMQ快速入门

RocketMQ快速入门

RocketMQ提供了发送多种发送消息的模式,例如同步消息,异步消息,顺序消息,延迟消息,事务消息等,我们一一学习

1.1 消息发送和监听的流程
我们先搞清楚消息发送和监听的流程,然后我们在开始敲代码

1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体等
5.发送消息
6.关闭生产者producer

1.2 消息消费者

1.创建消费者consumer,制定消费者组名
2.指定Nameserver地址
3.创建监听订阅主题Topic和Tag等
4.处理消息
5.启动消费者consumer

下面是搭建的一个基础使用案例

  1. 创建一个sp工程,导入rocketmq依赖
<!-- 原生的api--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version>
<!--            docker的用下面这个版本<version>4.4.0</version>&ndash;&gt;--></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
  1. 编写生产者
/*** 发消息*/@Testvoid contextLoads() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {// 创建一个生产者(定制一个生产者组名)DefaultMQProducer producer = new DefaultMQProducer("test-producer-test");// 连接nameServerproducer.setNamesrvAddr("127.0.0.1:9876");// 启动producer.start();// 创建一个消息Message message = new Message("testTopic", "我是一个简单的消息".getBytes());// 发送消息SendResult send = producer.send(message);System.out.println(send.getSendStatus());// 关闭生产者producer.shutdown();}

编写消费者:

@Testvoid simpleConsumer() throws MQClientException, IOException {// 创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");// 连接namesrvconsumer.setNamesrvAddr("127.0.0.1:9876");// 订阅一个主题 * 表示订阅这个主题中的所有消息,后期会有消息过滤consumer.subscribe("testTopic","*");// 设置一个监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 这个就是消费的方法(业务处理)System.out.println("我是消费者");System.out.println("消息的内容是:"+new String(msgs.get(0).getBody()));System.out.println("消费上下文:"+consumeConcurrentlyContext);// 返回值 CONSUME_SUCCESS成功,消息会从mq出队// RECONSUME_LATER (报错/null) 代表失败,消息会重新回到队列,过一会重新投递出来,给当前消费者或者其他消费者消费的return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动consumer.start();// 挂起当前jvm,就是让其不要停止,因为消费逻辑是异步执行的,这里是为了防止主线程执行完而异步线程还没执行完System.in.read();}

执行效果:

在这里插入图片描述
2. 消费模式

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。
Push是服务端【MQ】主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式,rocketmq拉取消息的规则大致如下所示:

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
3. RocketMQ发送同步消息
上面的快速入门就是发送同步消息,发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式

在这里插入图片描述
4. RocketMQ发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知

代码如下:

    @Testpublic void asyncProducer() throws MQClientException, RemotingException, InterruptedException, IOException {DefaultMQProducer producer = new DefaultMQProducer();producer.setNamesrvAddr("127.0.0.1:9876");producer.start();Message message = new Message("asyncTopic", "我是一个异步消费者".getBytes());producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("发送失败:"+throwable.getMessage().toString());}});System.out.println("主线程先执行");System.in.read();}

消费者端代码不变:

 @Testvoid simpleConsumer() throws MQClientException, IOException {// 创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");// 连接namesrvconsumer.setNamesrvAddr("127.0.0.1:9876");// 订阅一个主题 * 表示订阅这个主题中的所有消息,后期会有消息过滤consumer.subscribe("testTopic","*");// 设置一个监听器consumer.registerMessageListener(new MessageListenerConcurrently() {// // 注册一个消费监听 MessageListenerConcurrently是并发消费    _//_// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 这个就是消费的方法(业务处理) 这里执行消费的代码,默认是多线程执行System.out.println("我是消费者");System.out.println("消息的内容是:"+new String(msgs.get(0).getBody()));System.out.println("消费上下文:"+consumeConcurrentlyContext);// 返回值 CONSUME_SUCCESS成功,消息会从mq出队// RECONSUME_LATER (报错/null) 代表失败,消息会重新回到队列,过一会重新投递出来,给当前消费者或者其他消费者消费的return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动consumer.start();// 挂起当前jvm,就是让其不要停止,因为消费逻辑是异步执行的,这里是为了防止主线程执行完而异步线程还没执行完System.in.read();}

5.RocketMQ发送单向消息
这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送

单向生产者代码:

/*** 单向消息发送,不需要在乎消息是否发送成功,*  这种模式下生产者只需要负责发送数据就行,不需要关注*  消息有没有发送成功,例如:日志处理* @throws Exception*/@Testpublic void  OneWayProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();Message message = new Message("oneWayTopic", "日志xxx".getBytes());producer.sendOneway(message);System.out.println("成功");producer.shutdown();}

消费者代码和上面一样

6. RocketMQ发送延迟消息
消息放入mq后,过一段时间,才会被监听到,然后消费
比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

生产者代码:

/*** 延迟消息* @throws Exception*/@Testpublic void msproducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();Message message = new Message("orderTopicTest", "订单号,座位号".getBytes());// 为消息设置过期时间  1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h// 消息的不同level对应不同的时间,因此这里的3对应着10smessage.setDelayTimeLevel(3);producer.send(message);System.out.println("发送时间:"+new Date());producer.shutdown();}

消费者端代码:

    @Testpublic void  msConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ms-consumer-group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("orderTopicTest","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("收到了消息"+new Date());System.out.println(new String(list.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();}

7.RocketMQ发送批量消息

Rocketmq可以一次性发送一组消息,那么这一组消息会被当做一个消息消费

批量消息生产者:

/*** 批量发送消息*/@Testpublic void testBatchProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();List<Message> msgs = Arrays.asList(new Message("batchTopic","我是一组消息的A消息".getBytes()),new Message("batchTopic","我是一组消息的B消息".getBytes()),new Message("batchTopic","我是一组消息的C消息".getBytes()));SendResult send = producer.send(msgs);System.out.println(send.getSendStatus().toString());// 关闭实例producer.shutdown();}

消费者端代码:

@Testpublic void  msConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch-consumer-group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("batchTopic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("收到了消息"+new Date());System.out.println(new String(list.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();}

8.RocketMQ发送顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为:分区有序或者全局有序
rocketMq的broker的机制,导致了rocketMq会有这个问题
因为一个broker中对应了四个queue

在这里插入图片描述

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中,消费时,同一个顺序获取到的肯定是同一个队列。

8.1 创建一个消息模型

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {private String orderSn;private Integer userId;private String desc;
}

8.2 顺序消息生产者

private List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer",1,"下单"),new MsgModel("qwer",1,"短信"),new MsgModel("qwer",1,"物流"),new MsgModel("zxcv",1,"下单"),new MsgModel("zxcv",1,"短信"),new MsgModel("zxcv",1,"物流"));/*** 顺序发送,这里是让相同订单号的消息进入到同一个队列* @throws Exception*/@Testpublic void orderlyProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();// 发送顺序消息,发送时要确保有序,并且要发送到同一个队列里去msgModels.forEach(msgModel -> {Message message = new Message("orderlyTopic", msgModel.toString().getBytes());try {producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {// 在这里选择队列,让相同订单号进入到同一个队列int hashCode = o.toString().hashCode();// 2 % 4 = 2// 3 % 4 = 3   周期性函数,这里得到的结果永远比模数小,保证了不会被越界int i = hashCode % list.size();return list.get(i);}},msgModel.getOrderSn());} catch (Exception e) {throw new RuntimeException(e);}});producer.shutdown();System.out.println("发送完成");}

8.3 顺序消费消息

/*** 顺序消费* @throws Exception*/@Testpublic void  orderlyConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-producer-group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("orderlyTopic","*");// MessageListenerConcurrently 代表并发模式,多线程,默认20个线程  // 重试16次//consumer.setConsumeThreadMax();// 这种模式下可以设置最大线程数// MessageListenerOrderly 顺序模式,单线程 无线重试Integer.Max_Valueconsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {System.out.println("线程id:"+Thread.currentThread().getId());System.out.println(new String(list.get(0).getBody()));return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.in.read();}

9.RocketMQ发送带标签的消息,消息过滤
Rocketmq提供消息过滤功能,通过tag或者key进行区分
我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才区别对待:

在这里插入图片描述

在这里插入图片描述
生产者代码:

@Testpublic void tagProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());producer.send(message);producer.send(message2);System.out.println("发送成功");producer.shutdown();}

消费者代码:

/*** 订阅关系一致性* @throws Exception*/@Testpublic void tagConsumer1() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("tagTopic","vip1");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("我是vip1的消费者,我正在消费消息"+new String(list.get(0).getBody().toString()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();}@Testpublic void tagConsumer2() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("tagTopic","vip1 || vip2");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("我是vip2的消费者,我正在消费消息"+new String(list.get(0).getBody().toString()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();}

9.1什么时候该用 Topic,什么时候该用 Tag?

总结:不同的业务应该使用不同的Topic如果是相同的业务里面有不同表的表现形式,那么我们要使用tag进行区分
可以从以下几个方面进行判断:
1.消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
2.业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
3.消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
4.消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。
总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。

10.RocketMQ中消息的Key

在rocketmq中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或者key来进行查询

在这里插入图片描述

在这里插入图片描述

带key的消息生产者代码:

/*** 业务参数,我们自身要确保唯一性* 为了查阅和去重* @throws Exception*/@Testpublic void keyProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();String key = UUID.randomUUID().toString();System.out.println(key);Message message = new Message("keyTopic","vip1",key,"我是vip1的文章".getBytes());producer.send(message);producer.shutdown();}

在这里插入图片描述

消费者端代码:

@Testpublic void keyConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("keyTopic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println("我是vip1,我正在消费消息"+list.get(0).getBody());System.out.println("我们的业务唯一标识是:"+messageExt.getKeys());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});}

11.Rocketmq重复消费问题

BROADCASTING(广播)模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择。
CLUSTERING(负载均衡)模式下,如果一个topic被多个consumerGroup消费,也会重复消费。 即使是在CLUSTERING模式``下,同一个consumerGroup下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把offset提交给broker,那么新的消费者可能会重新消费一次。虽然orderly模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起concurrently要严格了,但是加锁的线程和提交offset的线程不是同一个,所以还是会出现极端情况下的重复消费。
还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次。
那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是msgId也可以是你自定义的唯一的key,这样就可以去重了

在这里插入图片描述

生产者代码:伪造重复消息:

@Testpublic void rePeatProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();java.lang.String key = UUID.randomUUID().toString();Message message = new Message("tagTopic", null, key,"扣减库存-1".getBytes());Message message2 = new Message("tagTopic",null, key,"扣减库存-1".getBytes());// 测试发送两个一样的key的消息producer.send(message);producer.send(message2);System.out.println("发送成功");producer.shutdown();}

消费者端消费幂等性处理:

/*** 我们设计一个去重表,对消息的唯一key添加唯一索引* 每次消费消息的时候,先插入数据库,如果成功则执行业务逻辑* 如果插入失败,则说明消息来过了,直接签收了* @throws Exception*/@Testpublic void rePeatConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("keyTopic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);java.lang.String keys = messageExt.getKeys();// 插入数据库,因为我们key做了唯一索引
//                int i = jdbcTemplate.update("insert into order_per_log(`type`,`order_dn`,`user`) values(1,keys,'123')");// 新增,要么成功,要么报错,修改要么成功,要么返回0,要么报错// 如果成功,就处理业务逻辑,如果报错,就不执行System.out.println(new String(messageExt.getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();}

12.RocketMQ重试机制

一般我们的消息在投递过程中出现问题时,比如我们投递到队列失败了,这时候我们可以引入一个重试机制来进行重新投递

/*** 生产者发送消息失败进行重试* @throws Exception*/@Testpublic void retryProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();// 生产者发送消息 重试次数producer.setRetryTimesWhenSendFailed(2);
//            producer.setRetryTimesWhenSendAsyncFailed(2);String key = UUID.randomUUID().toString();System.out.println(key);Message message = new Message("retryTopic","vip1",key,"我是vip1的文章".getBytes());producer.send(message);producer.shutdown();}

在消费者端进行消费时,也可以进行重试:

/*** 重试的时间间隔* 10s 30s 1m,2m,3m,4m,5m,6m,7m 8m 9m 10m 20m 30m 1h 2h* 默认重试16次** 1. 能否自定义重试次数* 2. 如果重试16次都是失败的?*     (并发模式下可以重试16次,顺序模式性下可以重试int的最大值次),如果都失败了,就认为是一个死信消息*      则会放置在一个死信主题中,主题的名称:%DLQ%retry-consumer-group,然后在死信队列中进行处理,找人工,电话啥的* 3. 当消息失败时该如何处理** 重试次数一般五次* @throws Exception*/@Testpublic void retryConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("retryTopic","*");// 设置重试次数consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println(new Date());System.out.println(messageExt.getReconsumeTimes());// 业务报错,返回 返回RECONSUME_LATER都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});}/*** 监听死信队列* @throws Exception*/@Testpublic void retryDeadConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("%DLQ%retry-consumer-group","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println(new Date());System.out.println("记录到特别的位置,文件 mysql 通知人工处理");// 业务报错,返回 返回RECONSUME_LATER都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});}/*** 第二种方案 用法比较多* @throws Exception*/@Testpublic void retryConsumer2() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("retryTopic","*");// 设置重试次数consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);System.out.println(new Date());// 业务处理try {handDB();}catch (Exception e){// 重试int reconsumeTimes = messageExt.getReconsumeTimes();if (reconsumeTimes >= 3){// 不要重试了System.out.println("记录到特别的位置,文件,mysql,通知人工处理");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}System.out.println(messageExt.getReconsumeTimes());// 业务报错,返回 返回RECONSUME_LATER都会重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});}// 加上事务控制业务,报错回滚private void handDB() {int i = 10 / 0;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/596700.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自制c++题目《模板综合》

1.输出结果&#xff1a; 2.主函数不允许改变 int arr[5] { 5,2,3,1,4}; char arr1[5] { a,c,b,e,d }; good <int>a(arr,5); good <char>b(arr1,5); bad<int>(1,2); bad<float>(1.1, 1.2); 自制c题目《模板综合》 王赫辰/c语言 - Gitee.com

引导和服务

目录 一、Linux操作系统引导过程 1、引导过程总览图 2、引导过程的详细步骤 二、系统初始化进程 1、init进程&#xff08;串行启动&#xff09; 2、Systemd&#xff08;并行启动&#xff09; 3、Centos6与Centos7的区别&#xff1a; 4、Systemd单元类型 5、运行级别所…

EFCore8泛化关系在数据库中的体现

如图&#xff0c;在关系数据库中&#xff0c;数据表达为一张表&#xff0c;用一个字段“Discriminator”来做区分&#xff1a; 要达到这样的效果&#xff08;数据库中的结构&#xff09;&#xff0c;需要在XXContext中将继承关系的三个类都加上&#xff1a; public DbSet<P…

在 Mac 上轻松安装和配置 JMeter

Apache JMeter 是一个开源的负载测试工具&#xff0c;可以用于测试静态和动态资源&#xff0c;确定服务器的性能和稳定性。在本文中&#xff0c;我们将讨论如何下载和安装 JMeter。 安装 Java&#xff08;已安装 Java 的此步骤可跳过&#xff09; 要安装 Java&#xff0c;请按…

想学鸿蒙,又怕找不到工作?能有前途吗?

鸿蒙有没有前途&#xff0c;能不能找到工作&#xff1f;其实这与市场行情有关&#xff0c;这几年的互联网大家都已经感受到了&#xff0c;十分悲凉。不管是前端还是Android等开发行业&#xff0c;不是被裁员就是内卷严重&#xff0c;这几年倒下了无数家中小型互联网企业。 而作…

腾讯云2024年最新优惠活动整理汇总

腾讯云作为国内领先的云计算服务提供商&#xff0c;一直致力于为用户提供优质、高效的服务。为了更好地满足用户需求&#xff0c;腾讯云经常推出各种优惠活动&#xff0c;本文将对腾讯云最新优惠活动进行整理汇总&#xff0c;帮助用户更好地了解和利用这些福利。 一、腾讯云新用…

kali2.0安装VMware Tools 和自定义改变分辨率

kali2.0安装VMware Tools 和自定义改变分辨率 VMware Tools 简介&#xff1a;VMware Tools安装&#xff1a;自定义改变分辨率&#xff1a;xrandr命令修改分辨率&#xff1a; 前言&#xff1a; 因为kali2.0比较老 所以需要手动安装 WMware Tools 进行复制粘贴操作&#xff01; …

企业需要专业的合同档案管理系统吗

文书合同是企业与供应商、客户、员工等签订的重要文件&#xff0c;具有法律效力和约束力。合同档案管理系统可以帮助企业有效管理合同文件&#xff0c;提高合同管理的效率和准确性。 专久智能合同档案管理系统具备以下优点&#xff1a; 1. 文件安全性&#xff1a;合同文件经过专…

力扣刷题-二叉树-合并二叉树

617.合并二叉树&#xff08;经典&#xff09; 合并二叉树是操作两棵树的题目里面很经典的&#xff0c;如何对两棵树遍历以及处理&#xff1f; 给定两个二叉树&#xff0c;想象当你将它们中的一个覆盖到另一个上时&#xff0c;两个二叉树的一些节点便会重叠。 你需要将他们合并…

NE555学习笔记-2024

实物图片 NE555引脚图 内部时序图 示列1&#xff0c;红外接收电路 红外接收电路的工作原理&#xff1a;在上述电路中&#xff0c;TSOP1738构成了该电路的主要组成部分&#xff0c;旨在检测来自任何来源的红外信号。这用于检测38 KHz范围的信号&#xff0c;因此命名为“TSOP173…

探秘Spring Bean的秘境:作用域篇【beans 三】

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 探秘Spring Bean的秘境&#xff1a;作用域篇【beans 三】 前言单例作用域如何声明单例Bean&#xff1a;特点&#xff1a; 原型作用域如何声明原型Bean&#xff1a;特点&#xff1a; 会话作用域如何声明…

【KingbaseES】实现MySql函数Space

CREATE OR REPLACE FUNCTION SPACE(input_length integer) RETURNS text AS $$ BEGIN RETURN REPEAT( , input_length) AS SPACES; END; $$ LANGUAGE plpgsql;

深度学习课程实验二深层神经网络搭建及优化

一、 实验目的 1、学会训练和搭建深层神经网络&#xff1b; 2、掌握超参数调试正则化及优化。 二、 实验步骤 初始化 1、导入所需要的库 2、搭建神经网络模型 3、零初始化 4、随机初始化 5、He初始化 6、总结三种不同类型的初始化 正则化 1、导入所需要的库 2、使用非正则化…

实验笔记之——基于Linux服务器复现Instant-NGP及常用的tmux指令

之前博客实现了基于windows来复现Instant-NGP&#xff0c;本博文在linux服务器上测试 实验笔记之——基于windows复现Instant-NGP-CSDN博客文章浏览阅读444次&#xff0c;点赞15次&#xff0c;收藏7次。之前博客对NeRF-SLAM进行了调研&#xff0c;本博文先复现一下Intant-NGP。…

C++基础语法——基本知识、数据类型、运算符及程序流程结构

本专栏记录C学习过程包括C基础以及数据结构和算法&#xff0c;其中第一部分计划时间一个月&#xff0c;主要跟着黑马视频教程&#xff0c;学习路线如下&#xff0c;不定时更新&#xff0c;欢迎关注。 当前章节处于&#xff1a; >第1阶段-C基础入门 ---------第2阶段实战-通讯…

Java基础进阶(学习笔记)

注&#xff1a;本篇的代码和PPT图片来源于黑马程序员&#xff0c;本篇仅为学习笔记 static static 是静态的意思&#xff0c;可以修饰成员变量&#xff0c;也可以修饰成员方法 修饰成员的特点&#xff1a; 被其修饰的成员, 被该类的所有对象所共享 多了一种调用方式, 可以通过…

OpenHarmony源码转换器—多线程特性转换

本文讨论了如何将多线程的 Java 代码转换为 OpenHarmony ArkTS 代码​ 一、简介 Java 内存共享模型 以下示例伪代码和示意图展示了如何使用内存共享模型解决生产者消费者问题。 生产者消费者与共享内存间交互示意图 为了避免不同生产者或消费者同时访问一块共享内存的容器时…

数字信号处理期末复习——计算大题(一)

个人名片&#xff1a; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的在校大学生 &#x1f42f;个人主页&#xff1a;妄北y &#x1f427;个人QQ&#xff1a;2061314755 &#x1f43b;个人邮箱&#xff1a;2061314755qq.com &#x1f989;个人WeChat&#xff1a;V…

drf知识--11

补充 # 研究simple-jwt提供的Token类&#xff1a; 1、RefreshToken:生成refresh token的类 2、AccessToken:生成refresh token的类 3、Token&#xff1a;他们俩的父类 4、str(RefreshToken的对象)---得到字符串 refresh token&#xff0c;Token类写了 …

IO作业4.0

思维导图 创建出三个进程完成两个文件之间拷贝工作&#xff0c;子进程1拷贝前一半内容&#xff0c;子进程2拷贝后一半内容&#xff0c;父进程回收子进程的资源 #include <stdio.h> #include <string.h> #include <stdlib.h> #include <myhead.h> int …