【RabbitMQ】RabbitMQ高级:如何保证消息可靠性

目录

  • 概述
  • 异常捕获机制
  • 事务机制
  • 持久化存储机制
  • 发送端确认机制
    • 概述
    • 开启发布确认的方法
    • 单个发布确认
    • 批量发布确认
    • 异步发布确认
  • 消费端确认机制
  • 消息限流
  • 消息幂等性处理

概述

前面学习了如何简单使用RabbitMQ,在实际使用RabbitMQ时,我们还需要考虑很多,比如消息的可靠性:即保证消息发到队列并且消费者正确消费。

那为什么要保证消息的可靠性呢?

比如用支付宝给商家支付,需要考虑转账的话,会不会把我的钱扣了,商家没有收到我的钱?

一般我们使用支付宝或微信转账支付的时候,都是扫码,支付,然后立刻得到结果,说你支付了多少钱,如果你绑定的是银行卡,可能这个时候你并没有收到支付的确认消息。往往是在一段时间之后,你会收到银行卡发来的短信,告诉你支付的信息。

支付平台如何保证这笔帐不出问题?

在这里插入图片描述

支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性。

支付平台通过如下几种方式保证数据一致性:

  1. 分布式锁

这个比较容易理解,就是在操作某条数据时先锁定,可以用redis或zookeeper等常用框架来实现。 比如我们在修改账单时,先锁定该账单,如果该账单有并发操作,后面的操作只能等待上一个操作的锁释放后再依次执行。

优点:能够保证数据强一致性。 缺点:高并发场景下可能有性能问题。

  1. 消息队列

消息队列是为了保证最终一致性,我们需要确保消息队列有ack机制 客户端收到消息并消费处理完成后,客户端发送ack消息给消息中间件 如果消息中间件超过指定时间还没收到ack消息,则定时去重发消息。

比如我们在用户充值完成后,会发送充值消息给账户系统,账户系统再去更改账户余额。

优点:异步、高并发 缺点:有一定延时、数据弱一致性,并且必须能够确保该业务操作肯定能够成功完成,不可能失败。

我们可以从以下几方面来保证消息的可靠性:

  1. 客户端代码中的异常捕获,包括生产者和消费者
  2. AMQP/RabbitMQ的事务机制
  3. 发送端确认机制
  4. 消息持久化机制
  5. Broker端的高可用集群
  6. 消费者确认机制
  7. 消费端限流
  8. 消息幂等性

异常捕获机制

先执行行业务操作,业务操作成功后执行行消息发送,消息发送过程通过try catch 方式捕获异常,在异常处理理的代码块中执行行回滚业务操作或者执行行重发操作等。这是一种最大努力确保的方式,并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。

在这里插入图片描述

另外,可以通过spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试

事务机制

没有捕获到异常并不能代表消息就一定投递成功了。

我们可以手动开启事务,如果一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销比较大,一般也不推荐使用。

在这里插入图片描述

持久化存储机制

持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机等)数据将会丢失。主要从以下几个方面来保障消息的持久性:

  1. Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不不丢失。
  2. Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不不丢失。
  3. 消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2即可实现消息的持久化,保证消息自身不丢失。

在这里插入图片描述

注意:将消息标记为持久化并不能完全保证不会丢失消息。尽管生产者告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,还没有存储完毕,消息还在缓存中的一个间隔点,此时并没有真正的将消息写入磁盘,持久性保证并不强,如果需要更强有力的持久化策略,参考后面的发布确认模式。

发送端确认机制

概述

RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。

在这里插入图片描述

RabbitMQ 回传给生产者的确认消息中的deliveryTag 字段包含了确认消息的序号。

另外,通过设置channel.basicAck方法中的multiple参数,表示是否批量应答。如果 multiple 为 true ,则代表批量应答 channel 上未应答的消息(可能会丢失数据)。如果 multiple 为 false ,则代表只会应答 channel 上正在处理完毕的消息(推荐使用)。

confirm 模式最大的好处在于异步,一旦发布一条消息,生产者应用程序就可以在等待信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

开启发布确认的方法

默认情况下,发布确认是关闭的。如果要开启需要调用方法 confirmSelect ,所以当我们想要使用发布确认的时候,都需要在 channel 上调用该方法。

Channel channel = RabbitmqUtils.getChannel();
// 开启发布确认模式
channel.confirmSelect();

单个发布确认

这是一个简单的发布确认方式,它是一种 同步发布确认的方式,也就是发布一条消息之后只有它被确认,后续的消息才能继续发布。

这种发布确认的方式有一个最大的缺点:发布速度特别慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布。

示例:

import cn.hutool.core.map.MapUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;/*** 生产者*/
public class Producer {/*** 队列的名称*/public static final String QUEUE_NAME = "hello";/*** 发送消息的个数*/public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {// 单个发布确认 耗时:341singleReleaseConfirmed();}/*** 单个发布确认** @throws Exception*/public static void singleReleaseConfirmed() throws Exception {Channel channel = RabbitmqUtils.getChannel();// 开启发布确认模式channel.confirmSelect();// 队列持久化boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, MapUtil.newHashMap());long startTime = System.currentTimeMillis();// 批量发送消息,单个发布确认for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = String.valueOf(i);// MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));boolean b = channel.waitForConfirms();if (b) {System.out.println("消息发送成功");}}long endTime = System.currentTimeMillis();// 耗时:341System.out.println("耗时:" + (endTime - startTime));System.out.println("消息发送完毕");}
}

批量发布确认

和单个发布确认相比,批量发布确认是先发布一批消息,然后一起确认,可以极大的提高吞吐量,但是这种方式的缺点是:当发生故障的时候会导致发布出现问题时,不知道是哪个消息出现问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。

批量发布确认也是同步的,一样会阻塞后续消息的发布。

示例:

import cn.hutool.core.map.MapUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;/*** 生产者*/
public class Producer {/*** 队列的名称*/public static final String QUEUE_NAME = "hello";/*** 发送消息的个数*/public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {// 批量发布确认 耗时:39batchReleaseConfirmed();}/*** 批量发布确认** @throws Exception*/public static void batchReleaseConfirmed() throws Exception {Channel channel = RabbitmqUtils.getChannel();// 开启发布确认模式channel.confirmSelect();// 队列持久化boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, MapUtil.newHashMap());long startTime = System.currentTimeMillis();// 批量确认消息的大小int batchSize = 100;// 批量发送消息,批量发布确认for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = String.valueOf(i);// MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));if (i % batchSize == 0) {boolean b = channel.waitForConfirms();if (b) {System.out.println("消息发送成功");}}}long endTime = System.currentTimeMillis();// 耗时:341System.out.println("耗时:" + (endTime - startTime));System.out.println("消息发送完毕");}}

异步发布确认

异步发布确认虽然编程逻辑比上面两个要复杂,但是性价比是最高的(无论是可靠性还是效率)。它是利用回调函数来达到消息可靠性传递的。

在这里插入图片描述

示例:

import cn.hutool.core.map.MapUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;/*** 生产者*/
public class Producer {/*** 队列的名称*/public static final String QUEUE_NAME = "hello";/*** 发送消息的个数*/public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {// 异步发布确认 耗时:33asynchronousReleaseConfirmed();}/*** 异步发布确认*/public static void asynchronousReleaseConfirmed() throws Exception {Channel channel = RabbitmqUtils.getChannel();// 开启发布确认模式channel.confirmSelect();// 队列持久化boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, MapUtil.newHashMap());/*** 线程安全有序的哈希表,适用于高并发的情况* ① 可以将序号和消息进行关联。* ② 可以批量删除条目。* ③ 支持高并发。*/ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();// 准备消息的监听器,用来监听那些消息成功了,那些消息失败了。// 消息确认成功的回调函数ConfirmCallback ackCallback = (deliveryTag, multiple) -> {System.out.println("确认的消息的 ID = " + deliveryTag);// 删除掉已经确认的消息,剩下的就是未确认的消息if (multiple) { // 如果是批量// 删除已经确认的消息ConcurrentNavigableMap<Long, String> headMap = map.headMap(deliveryTag);headMap.clear();} else {// 只清除当前序列号的消息map.remove(deliveryTag);}};// 消息确认失败的回调函数ConfirmCallback nackCallback = (deliveryTag, multiple) -> {System.out.println("未确认的消息的 ID = " + deliveryTag);// 输出未确认的消息String msg = map.get(deliveryTag);System.out.println("未确认的消息 = " + msg);};// 异步channel.addConfirmListener(ackCallback, nackCallback);long startTime = System.currentTimeMillis();// 批量发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = String.valueOf(i);// MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));// 记录所有要发送的消息// channel.getNextPublishSeqNo()获取下一个消息的序列号map.put(channel.getNextPublishSeqNo(), msg);}long endTime = System.currentTimeMillis();// 耗时:341System.out.println("耗时:" + (endTime - startTime));System.out.println("消息发送完毕");}}

消费端确认机制

前面我们讲了生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我们又没有任何重试,那结果跟消息丢失没什么分别。

Consumer ACK,就是用来确认消息被消费者成功消费。,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。

一般而言,我们有如下处理手段:

  1. 采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表,再通过后台定时任务扫描异常恢复表尝试做重试动作。则只要收到消息后就立即确认(消息出列,标记已消费),如果业务不自行处理则有丢失数据的风险。
  2. 采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期。
  3. 采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法basicAck方法返回Ack。

SpringBoot项目中支持如下的一些配置:

#最大重试次数 
spring.rabbitmq.listener.simple.retry.max-attempts=5 
#是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到jack 确认或者一直到超时) 
spring.rabbitmq.listener.simple.retry.enabled=true 
#重试间隔时间(单位毫秒) 
spring.rabbitmq.listener.simple.retry.initial-interval=5000 
# 重试超过最大次数后是否拒绝
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#ack模式 
spring.rabbitmq.listener.simple.acknowledge-mode=manual

ack模式如下:

在这里插入图片描述

Springboot中使用如下:

  1. pom.xml
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
  1. application.properties
spring.application.name=consumer_ack 
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/ 
spring.rabbitmq.username=root 
spring.rabbitmq.password=123456 
spring.rabbitmq.port=5672
#最大重试次数 
spring.rabbitmq.listener.simple.retry.max-attempts=5 
#是否开启消费者重试(为false时关闭消费者重试, # 意思不是“不重试”,而是一直收到消息直到jack确认或者一直到超时) 
spring.rabbitmq.listener.simple.retry.enabled=true 
#重试间隔时间(单位毫秒) 
spring.rabbitmq.listener.simple.retry.initial-interval=5000 
# 重试超过最大次数后是否拒绝 
spring.rabbitmq.listener.simple.default-requeue-rejected=false 
#ack模式 
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  1. 主入口类
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class RabbitmqDemo {@Autowiredprivate RabbitTemplate rabbitTemplate;public static void main(String[] args) {SpringApplication.run(RabbitmqDemo.class, args);}@Beanpublic ApplicationRunner runner() {return args -> {Thread.sleep(5000);for (int i = 0; i < 10; i++) {MessageProperties props = new MessageProperties();props.setDeliveryTag(i);Message message = new Message(("消息:" + i).getBytes("utf-8"), props);// this.rabbitTemplate.convertAndSend("ex.biz", "biz", "消息:" + i);this.rabbitTemplate.convertAndSend("ex.biz", "biz", message);}};}
}
  1. RabbitConfig
package com.lagou.rabbitmq.demo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic Queue queue() {return new Queue("q.biz", false, false, false, null);}@Beanpublic Exchange exchange() {return new DirectExchange("ex.biz", false, false, null);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(exchange()).with("biz").noargs();}
}
  1. MessageListener
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;import java.io.IOException;
import java.util.Random;// @Component
public class MessageListener {private Random random = new Random();/*** NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据 的风险* AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中* MANUAL模式,需要显式的调用当前channel的basicAck方法** @param channel* @param deliveryTag* @param message*/// @RabbitListener(queues = "q.biz", ackMode = "NONE")// @RabbitListener(queues = "q.biz", ackMode = "AUTO")@RabbitListener(queues = "q.biz", ackMode = "MANUAL")public void handleMessageTopic(Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, @Payload String message) {System.out.println("RabbitListener消费消息,消息内容:" + message);try {if (random.nextInt(10) % 3 != 0) {// 手动nack,告诉broker消费者处理失败,最后一个参数表示是否需要将消息重新入列// channel.basicNack(deliveryTag, false, true)// 手动拒绝消息。第二个参数表示是否重新入列channel.basicReject(deliveryTag, true);} else {// 手动ack,deliveryTag表示消息的唯一标志,multiple表示是 否是批量确认channel.basicAck(deliveryTag, false);System.err.println("已确认消息:" + message);}} catch (IOException e) {e.printStackTrace();}}
}
  1. BizController
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Random;@RestController
public class BizController {@Autowiredprivate RabbitTemplate rabbitTemplate;private Random random = new Random();@RequestMapping("/biz")public String getBizMessage() {String message = rabbitTemplate.execute(new ChannelCallback<String>() {@Overridepublic String doInRabbit(Channel channel) throws Exception {final GetResponse getResponse = channel.basicGet("q.biz", false);if (getResponse == null) return "你已消费完所有的消息";String message = new String(getResponse.getBody(), "utf-8");if (random.nextInt(10) % 3 == 0) {channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);return "已确认的消息:" + message;} else {// 拒收一条消息channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), true);// 可以拒收多条消息// channel.basicNack(getResponse.getEnvelope().getDeliveryTag(), false, true); return "拒绝的消息:" + message;}}});return message;}
}

消息限流

当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩 溃,而分布式系统的故障往往会发生上下游传递,连锁反应那就会很悲剧…

下面我将从多个角度介绍QoS与限流,防止上面的悲剧发生。

  1. 限制生产者发送

RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项指标恢复正常。

全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以继续或被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。

在/etc/rabbitmq/rabbitmq.conf中配置磁盘可用空间大小:

在这里插入图片描述

  1. 基于连接流控

RabbitMQ 还默认提供了一种基于credit flow流控机制,面向每一个连接进行流控。当单个队列达到最大流速时,或者多个队列达到总流速时,都会触发流控。触发单个链接的流控可能是因为connection、channel、queue的某一个过程处于flow状态,这些状态都可以从监控平台看到。

总的状态:

在这里插入图片描述

connection、channel、queue的状态:

在这里插入图片描述

  1. 消费端限流

RabbitMQ中有一种QoS保证机制,可以限制Channel上接收到的未被Ack的消息数量,如果超过这个数量限制RabbitMQ将不会再往消费端推送消息。可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。

比较值得注意的是QoS机制仅对于消费端推模式有效,对拉模式无效。而且不支持NONE Ack模式。执行channel.basicConsume 方法之前通过 channel.basicQoS 方法可以设置该数量。

消息的发送是异步的,消息的确认也是异步的。在消费者消费慢的时候,可以设置Qos的prefetchCount,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消息而没有一个消息确认的时候,就停止发送。消费者确认一个,broker就发送一个,确认两个就发送两个。换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount个。

如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都确认了。

总结:

生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快且超过了下游的消费速度时就容易出现消息积压/堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急开关等控制手段,避免超过Broker端的极限承载能力或者压垮下游消费者。

再看看下游,我们期望下游消费端能尽快消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端处理速度是最快、最稳定而且还相对均匀(比较理想化)。

提升下游应用的吞吐量和缩短消费过程的耗时,优化主要以下几种方式:

  1. 优化应用程序的性能,缩短响应时间(需要时间)
  2. 增加消费者节点实例(成本增加,而且底层数据库操作这些也可能是瓶颈)
  3. 调整并发消费的线程数(线程数并非越大越好,需要大量压测调优至合理值)

示例:

@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){// SimpleRabbitListenerContainerFactory发现消息中有content_type有text 就会默认将其转换为String类型的,没有content_type都按byte[]类型 SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置并发线程数 factory.setConcurrentConsumers(10);// 设置最大并发线程数 factory.setMaxConcurrentConsumers(20);return factory;
}

消息幂等性处理

在我们追求高性能就无法保证消息的顺序,而追求可靠性那么就可能产生重复消息,从而导致重复消费。

RabbitMQ层面有实现“去重机制”来保证“恰好一次”吗?答案是并没有。而且这个在目前主流的消息中间件都没有实现。

事实证明,很多业务场景下是可以容忍重复消息的。例如:操作日志收集,而对一些金融类的业务则要求比较严苛。

一般解决重复消息的办法是,在消费端让我们消费消息的操作具备幂等性。幂等性问题并不是消息系统独有,而是(分布式)系统中普遍存在的问题。例如:RPC框架调用超后会重试,HTTP请求会重复发起(用户手抖多点了几下按钮)

一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。对于幂等的方法,不用担心重复执行会对系统造成任何改变

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/615562.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL-外键等信息

38. 基础-多表查询-概述_哔哩哔哩_bilibili 1、流程函数 2、约束字段 删除外键 &#xff1a; alter table emp2 drop foreign key 外键名 //外键可以保持数据的一致性和完整性&#xff0c;外键的话&#xff0c;就是类似一个主表&#xff0c;一个从表&#xff0c;从表的其中一…

ROS2入门之节点与指令

文章目录 前言一、初识ROS21.ROS简介2.ROS系统框架 二、ROS2创建节点(CPP)1.创建工作空间2.创建功能包3.创建节点4.配置CMakeLists5.编译运行节点&#x1f353;编译节点&#x1f34a;source环境&#x1f34e; 运行节点 报错解决 三、ROS2常用指令1.ros2 pkg create2.ros2 pkg l…

mac 使用brew卸载node

1.查看当前的node版本 node -v 2.查看使用brew 安装的版本&#xff0c;可以看到本机装了14、16、18版本的node brew search node 3.卸载node brew uninstall node版本号 --force 如分别删除14、16、18版本的node命令如下 brew uninstall node14 --force brew uninstall no…

使用AutoDL云计算平台训练并测试Pytorch版本NeRF代码

文章目录 前言一、数据集及代码获取二、租用并设置服务器三、Pycharm远程开发四、训练并测试代码 前言 因为第一次在云服务器上跑代码&#xff0c;所以在这里记录一下。 一、数据集及代码获取 nerf-pytorch项目是 NeRF 的忠实 PyTorch 实现&#xff0c;它在运行速度提高 1.3 倍…

PLM系统功能、彩虹PLM系统功能、产品数据管理系统

彩虹PLM系统的功能 产品数据管理 产品数据管理是 PLM 系统的核心功能之一&#xff0c;它主要包括以下几个方面&#xff1a; &#xff08;1&#xff09;数据存储&#xff1a;将产品的设计数据&#xff08;如 CAD 模型、图纸、BOM 等&#xff09;存储在统一的数据库中&#xf…

springmvc内嵌tomcat、tomcat整合springmvc、自研国产web中间件

springmvc内嵌tomcat、tomcat整合springmvc、自研国产web中间件 这是由于公司老项目转化springboot存在太多坑&#xff0c;特别是hibernate事务一条就坑到跑路&#xff0c;你又不想搞没听说过的国产中间件兼容&#xff0c;又不想搞weblogic、WebSphere等中间件的适配&#xff…

Gitlab中的CICD的使用方法

一、CI/CD执行机制 二、离线安装gitlab-runner 下载相应版本的gitlab-runner &#xff08;下载地址&#xff1a;https://packages.gitlab.com/runner/gitlab-runner&#xff09; dpkg -i gitlab-runner_12.8.0_amd64.debgitlab-runner register第3步中需要的信息可从下图所示…

AMEYA360报导:瑞萨宣布收购Transphorm,大举进军GaN

全球半导体解决方案供应商瑞萨电子与全球氮化镓(GaN)功率半导体供应商Transphorm, Inc.(以下“Transphorm”)于今天宣布双方已达成最终协议&#xff0c;根据该协议&#xff0c;瑞萨子公司将以每股5.10美元现金收购Transphorm所有已发行普通股&#xff0c;较Transphorm在2024年1…

Next.js 学习笔记(六)——缓存

缓存 Next.js 可通过缓存渲染工作和数据请求来提高应用程序的性能并降低成本。本页将深入介绍 Next.js 缓存机制、可用于配置这些机制的 API 以及它们之间的交互方式。 需要知道&#xff1a;本页将帮助你了解 Next.js 的工作原理&#xff0c;但这并不是使用 Next.js 提高工作效…

【VMware】Windows部署单机OA项目---图文并茂详细讲解

目录 一 准备工作 二 安装JDK 三 tomcat安装 ​四 MySQL安装 ①解压MySQL压缩包 ②my文件拷贝mysql安装根目录下 ③ 修改my文件 ④ 安装MySQL 4.1 注册mysql服务 4.2 初始化 4.3 启动MySQL 4.4 登入MySQL 4.5 修改默认的MySQL密码 五 连接MySQL 5.1 虚拟机连接MyS…

便携式灯具的UL测试标准UL153介绍

UL153标准&#xff1a;UL153标准主要是描述有关使用电源线及插头作为连接工具,使用120伏电压,15或20安培的电源,并符合美国国家电器规范的便携灯.此标准也适用于那些不用插头,而用一些兼容的接线端作为连接工具的便携灯&#xff0c;同时对于使用非120伏电压&#xff0c;15or20安…

计算机毕业设计——SpringBoot仓库管理系统(附源码)

1&#xff0c;绪论 1.2&#xff0c;项目背景 随着电子计算机技术和信息网络技术的发明和应用&#xff0c;使着人类社会从工业经济时代向知识经济时代发展。在这个知识经济时代里&#xff0c;仓库管理系统将会成为企业生产以及运作不可缺少的管理工具。这个仓库管理系统是由&a…

六西格玛绿带培训——实现完美操作的关键工具

当我们谈论六西格玛&#xff0c;我们不仅仅谈论一个管理工具或是企业流程改进的方法。我们谈的是一种愿景——实现几乎完美的操作。在SpaceX、在Tesla&#xff0c;我们每天努力实现这种精确度&#xff0c;因为即使是一丝一毫的疏漏&#xff0c;都可能成为我们星际野望无情的噩梦…

滑动条QSlider和进度条QProgressBar

1. 滑动条&#xff1a;QSlider 实例化 //实例化1 // QSlider* slider new QSlider(this);QSlider* slider new QSlider(Qt::Horizontal,this);//第一个参数使摆放方式2.1滑动条的基本函数 滑动条刻度的 位置 参数 QSlider::NoTicks //不要画任何标记 QSlider::TicksBoth…

高工微报告|智驾前视一体机趋势

传统智驾前视一体机赛道上&#xff0c;1V3R、1V5R产品如何升级备受关注。 根据日前调研获取的信息&#xff0c;1V3R、1V5R向轻量级行泊一体&#xff08;典型为5V5R12U方案&#xff0c;算力平台5-20TOPS&#xff09;迈进的具体市场空间&#xff0c;仍在验证阶段。 其中&#x…

c#图片作为鼠标光标

图片转换为鼠标光标代码如下&#xff1a; private void Form1_Load(object sender, EventArgs e) {//button1.Cursor System.Windows.Forms.Cursors.Hand;Bitmap bmp new Bitmap("780.jpg");Cursor cursor new Cursor(bmp.GetHicon());button1.Cursor cursor;} …

CSS进阶方法——复合选择器、元素显示、背景设置

1、复合选择器 复合选择器是建立在基础选择器之上&#xff0c;对基础选择器进行组合形成的。 复合选择器可以更准确、更高效的选择目标元素&#xff08;标签&#xff09;复合选择器是由两个或多个基础选择器&#xff0c;通过不同的方式组合而成的常用的复合选择器包括&#xf…

20240112-【UNITY 学习】实现第一人称移动教程

1、创建一个空物体&#xff0c;挂载Rigidbody组件&#xff0c;并设置相应参数 2、在上述空物体下创建一个胶囊体&#xff0c;两个空物体&#xff0c;一个用来控制朝向&#xff0c;另一个用来控制摄像机 3、给摄像机创建一个父物体&#xff0c;并挂载脚本MoveCamera_01.cs using…

“数据要素×”行动计划发布,粮食安全监管如何应变?

近日&#xff0c;国家数据局发布“数据要素”三年行动计划&#xff08;2024-2026年&#xff09;&#xff0c;在“数据要素现代农业“部分提到&#xff1a;提升农业综合生产能力&#xff0c;支持农业生产经营主体和相关服务企业融合利用气象、土壤、农事作业、病虫害、市场等数据…

wav2lip中文语音驱动人脸训练

1 Wav2Lip介绍 1.1 Wav2Lip概述 2020年&#xff0c;来自印度海德拉巴大学和英国巴斯大学的团队&#xff0c;在ACM MM2020发表了的一篇论文《A Lip Sync Expert Is All You Need for Speech to Lip Generation In The Wild 》&#xff0c;在文章中&#xff0c;他们提出一个叫做…