rabbitmq+springboot实现幂等性操作

文章目录

  • 1.场景描述
    • 1.1 场景1
    • 1.2 场景2
  • 2.原理
  • 3.实战开发
    • 3.1 建表
    • 3.2 集成mybatis-plus
    • 3.3 集成RabbitMq
      • 3.3.1 安装mq
      • 3.3.2 springBoot集成mq
    • 3.4 具体实现
      • 3.4.1 mq配置类
      • 3.4.2 生产者
      • 3.4.3 消费者

1.场景描述

消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。

1.1 场景1

什么意思呢?举个例子:一个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。
这种情景就会出现消息可能被多次地投递。

1.2 场景2

还有一种场景是程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。

以上两个场景对于消息队列来说就是同一个messageId的消息重复投递下来了。

我们利用消息id来判断消息是否已经消费过,如果该信息被消费过,那么消息表中已经 会有一条数据,由于消费时会先执行插入操作,此时会因为主键冲突无法重复插入,我们就利用这个原理来进行幂等的控制,消息内容可以用json格式来进行传输的。

3.实战开发

3.1 建表

DROP TABLE IF EXISTS `message_idempotent`;
CREATE TABLE `message_idempotent` (`message_id` varchar(50) NOT NULL COMMENT '消息ID',`message_content` varchar(2000) DEFAULT NULL COMMENT '消息内容',`status` int DEFAULT '0' COMMENT '消费状态(0-未消费成功;1-消费成功)',`retry_times` int DEFAULT '0' COMMENT '重试次数',`type` int DEFAULT '0' COMMENT '消费类型',PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.2 集成mybatis-plus

《springBoot集成mybatisPlus》

3.3 集成RabbitMq

3.3.1 安装mq

推荐使用docker安装rabbitmq,还未安装的可以参考以下信息:

  • docker安装

3.3.2 springBoot集成mq

  • 1.添加依赖
 <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

3.4 生产者具体实现

3.4.1 mq配置类

  • DirectRabbitConfig
    具体如何开启可以参考《rabbitMq实现死信队列》
import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitmqConfig {//正常交换机的名字public final static String  EXCHANGE\_NAME = "exchange\_name";//正常队列的名字public final static String QUEUE\_NAME="queue\_name";//死信交换机的名字public final static String  EXCHANGE\_DEAD = "exchange\_dead";//死信队列的名字public final static String QUEUE\_DEAD="queue\_dead";//死信路由keypublic final static String DEAD\_KEY="dead.key";//创建正常交换机@Bean(EXCHANGE\_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)//持久化 mq重启后数据还在.durable(true).build();}//创建正常队列@Bean(QUEUE\_NAME)public Queue queue(){//正常队列和死信进行绑定 转发到 死信队列,配置参数Map<String,Object>map=getMap();return new Queue(QUEUE\_NAME,true,false,false,map);}//正常队列绑定正常交换机 设置规则 执行绑定 定义路由规则 requestmaping映射@Beanpublic Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,@Qualifier(EXCHANGE\_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange)//路由规则.with("app.#").noargs();}//创建死信队列@Bean(QUEUE\_DEAD)public Queue queueDead(){return new Queue(QUEUE\_DEAD,true,false,false);}//创建死信交换机@Bean(EXCHANGE\_DEAD)public Exchange exchangeDead(){return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD).durable(true) //持久化 mq重启后数据还在.build();}//绑定死信队列和死信交换机@Beanpublic Binding deadBinding(){return BindingBuilder.bind(queueDead()).to(exchangeDead())//路由规则 正常路由key.with(DEAD\_KEY).noargs();}/\*\*获取死信的配置信息\*\*\*/public Map<String,Object>getMap(){//3种方式 任选其一,选择其他方式之前,先把交换机和队列删除了,在启动项目,否则报错。//方式一Map<String,Object> map=new HashMap<>(16);//死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);//死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值map.put("x-dead-letter-routing-key", DEAD\_KEY);//方式二//消息的过期时间,单位:毫秒;达到时间 放入死信队列// map.put("x-message-ttl",5000);//方式三//队列最大长度,超过该最大值,则将从队列头部开始删除消息;放入死信队列一条数据// map.put("x-max-length",3);return map;}}
  • 延迟队列配置
    具体如何开启可以参考《rabbitMq实现死信队列》

由于rabbitMq中不直接支持死信队列,需要我们利用插件rabbitmq_delayed_messgae_exchage进行开启

/*** 定义延迟交换机*/
@Configuration
public class RabbitMQDelayedConfig {//队列private static final String DELAYQUEUE = "delayedqueue";//交换机private static final String DELAYEXCHANGE = "delayedExchange";@Beanpublic Queue delayqueue(){return new Queue(DELAYQUEUE);}//自定义延迟交换机@Beanpublic CustomExchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");/*** 1、交换机名称* 2、交换机类型* 3、是否需要持久化* 4、是否需要自动删除* 5、其他参数*/return new CustomExchange(DELAYEXCHANGE,"x-delayed-message",true,false,arguments);}//绑定队列和延迟交换机@Beanpublic Binding delaybinding(){return BindingBuilder.bind(delayqueue()).to(delayedExchange()).with("sectest").noargs();}
}

3.4.2 生产者

  • 1.消费队列的生产者
import com.example.shop.config.RabbitmqConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
public class Sender_Direct {@Autowiredprivate AmqpTemplate rabbitTemplate;/*** 用于消费订单** @param orderId*/public void send2Direct(String orderId) {//创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)MessageProperties messageProperties = new MessageProperties();rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, "内容设置",  message -> {//设置消息的id为唯一messageProperties.setMessageId(UUID.randomUUID().toString());messageProperties.setContentType("text/plain");messageProperties.setContentEncoding("utf-8");message.getMessageProperties().setMessageId(orderId);return message;});}}

3.4.3 消费者

1.开启手动ack配置

spring:application:name: shoprabbitmq:host: 192.168.1.102port: 5673virtual-host: /username: guestpassword: guestlistener:simple:# 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 autoacknowledge-mode: manual

消费者要配置ack重试机制,具体参考前几篇文章,使用的是mysql消息ID的唯一性,有时候可能生成一样的订单,具体的没有进行实验,内容是json生成的,可以执行业务

import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.des.Bean.MessageIdempotent;
import com.example.des.Bean.Shop;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class Receiver_Direct {private static final Integer delayTimes = 30;//延时消费时间,单位:秒@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = {"smsQueue"})public void receiveD(Message message, Channel channel) throws IOException {try {// 获取消息IdString messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody());//获取消息//向数据库插入数据MessageIdempotent messageIdempotent = new MessageIdempotent();messageIdempotent.setMessageId(messageId);messageIdempotent.setMessageContent(msg);messageIdempotent.setRetryTimes(0);System.out.println(messageIdempotent.toString());Boolean save = true;   //设置保存成功,消息投递失败是在确认模式那里if (!save) {//说明属于重重复请求//1、处理消息内容的业务,解析json数据//2、创建订单,并保存Boolean flag = consumeOrder(new Shop());if (flag){//投入延迟队列,如果30分钟订单还没有消费,就删除订单rabbitTemplate.convertAndSend("delayedExchange","sectest",message,message1->{//设置发送消息的延长时间 单位:ms,表示30分钟message1.getMessageProperties().setDelay(1000*60*30);return message1;});//更新消息状态,消费成功,channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}else {//延迟投入死信,进行重试channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}} else {//1、处理消息内容的业务,解析json数据//2、创建订单,并保存//投入死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}catch (Exception e){System.out.println("错误信息");}}private boolean consumeOrder(Shop shop) {return true;}@RabbitListener(queues = {" delay.queue.demo.delay.queue"})public void dead(String payload, Message message, Channel channel) throws IOException {System.out.println("死信队列:"+payload);//删除消息 将数据库状态更新为失败,更新邮件或者消息通知,有时候可以人工消费long deliveryTag=message.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag,true);}@RabbitListener(queues = "delayedqueue")public void receivemsg(Message messages){//查询有没有被消费,也就是更新成功,有时候需要乐观锁}
}

至此mq的消息重复以及幂等的信息处理就很完美的解决了,当然本文以数据库为例进行实现,感兴趣的可以尝试使用redis来进行实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/71094.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实现Map批量赋值,我只需24秒搞定!

函数的功能是将一组键值对批量赋值给Map中的键。在Java中&#xff0c;通常使用Map的put方法逐个将键值对赋值给Map&#xff0c;但在某些场景下&#xff0c;可能需要一次性将多个键值对赋值给Map。 函数功能&#xff1a;Map批量赋值 参数1&#xff1a;参数名称&#xff1a;targ…

C语言学习:6、C语言程序的循环结构

生活中&#xff0c;有很多循环的东西&#xff0c;比如钟表就是在1到12循环&#xff0c;太阳东升西落也是循环&#xff0c;春夏秋冬也是循环&#xff0c;人生可能也是一个循环。 while C语言中的循环可以这么描述&#xff1a;当某个条件成立&#xff0c;就一直做某件事或某些事…

煤矿虚拟仿真 | 采煤工人VR虚拟现实培训系统

随着科技的发展&#xff0c;虚拟现实(VR)技术已经逐渐渗透到各个行业&#xff0c;其中包括煤矿行业。VR技术可以为煤矿工人提供一个安全、真实的环境&#xff0c;让他们在虚拟环境中进行实际操作和培训&#xff0c;从而提高他们的技能水平和安全意识。 由广州华锐互动开发的采煤…

跳槽面试:如何转换工作场所而不失去优势

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

Ubuntu 20.04 安装宋体

环境&#xff1a; ubuntu 20.04,英文环境&#xff0c;但已经安装中文包 检查ubuntu中安装的中文字体 命令&#xff1a;fc-list :langzh 检查ubuntu中安装的所有字体 命令&#xff1a;fc-list 宋体下载&#xff1a;Simsun Font - Free Fonts 网盘分享&#xff1a;链接: https…

LeetCode 82 删除排序链表中的重复元素 II

LeetCode 82 删除排序链表中的重复元素 II 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 链接&#xff1a;https://leetcode.cn/problems/remove-duplicates-from-sorted-list-ii/description/ 博主Github&#xff1a;https://github.com/GDUT-Rp/LeetCode 题目&am…

vue中使用window.open打开assets文件夹下的pdf文件

需求&#xff1a;系统有个操作手册&#xff0c;点击会在浏览器新开个窗口并打开pdf文件。这个pdf文件存储在本地assets文件夹中。 文件结构&#xff1a; 注&#xff1a;直接使用window.open(文件路径)不能打开&#xff0c;需要在vue.config.js中配置所需文件 引入图中红框中的…

QTableWidget实现鼠标悬停整行高亮显示

一、最终效果 二、 重写QTableWidget类 mytablewidget.h #ifndef MYTABLEWIDGET_H #define MYTABLEWIDGET_H#include <QTableWidget>class MyTableWidget : public QTableWidget { public:explicit MyTableWidget(QWidget* parent nullptr);protected:void leaveEve…

Python 套接字编程完整指南

推荐&#xff1a;使用 NSDT场景编辑器 快速搭建3D应用场景 连接设备以交换信息是网络的全部意义所在。套接字是有效网络通信的重要组成部分&#xff0c;因为它们是用于通过本地或全球网络以及同一台计算机上的不同进程在设备之间传输消息的基本概念。它们提供了一个低级接口&am…

zipkin2.24.2源码install遇见的问题

1、idea导入项目后将Setting中的关于Maven和Java Compile相关的配置改为jdk11,同时Project Structure改为jdk11 2、将pom配置中的fork标签注释 标题未修改以上配置产生的问题 Compilation failure javac: Ч ı : --release : javac <options> <source files&g…

python3

#安装python3 brew install python3 看到下图表示安装python3成功: #将python3 加入环境变量 export PATH$PATH:/opt/homebrew/bin/#查看python 版本 python3 --version#查看pip 版本 pip3 --version#更新python源 pip3 config set global.index-url https://pypi.tuna.tsing…

Nginx中实现自签名SSL证书生成与配置

文章目录 一.相关介绍1.生成步骤2.相关名词介绍 二.Nginx中实现自签名SSL证书生成与配置1.私钥生成2.公钥生成3.生成解密的私钥key4.签名生成证书5.配置证书并验证6.登录 一.相关介绍 1.生成步骤 &#xff08;1&#xff09;生成私钥&#xff08;Private Key&#xff09;&…

WPF列表样式

WPF的数据绑定系统自动生成列表项对象&#xff0c;为单个项应用所需的样式不是很容易。解决方案是ItemContainerStyle 属性。如果设置了ItemContainerStyle 属性&#xff0c;当创建列表项时&#xff0c;列表控件会将其向下传递给每个项。对于ListBox控件&#xff0c;每个项有Li…

java8 Stream应用合集

多个数据集合如何合并为一个数组 List<Map<String,Object>> data1 this.queryReportInvestSu(param);List<Map<String,Object>> data2 this.queryReportInvestSu(param);List<Map<String,Object>> data3 this.queryReportInvestSu(para…

浅述C++模板——函数模板及类模板

前言 模板作为 C 的一大特色&#xff0c;对于泛型编程有着重要的作用。同时&#xff0c;对于大规模类似的函数或是类型不确定的类&#xff0c;模板都起了至关重要的作用。 一、模板 在开始学习模板之前&#xff0c;我们首先需要了解模板。先看下面一个例子&#xff1a; #in…

微信小程序开发前准备

文章目录 一.注册微信小程序开发账号&#xff08;一&#xff09;访问微信公众平台官网&#xff08;二&#xff09;进入注册页面&#xff0c;完成注册信息&#xff08;三&#xff09;设置微信小程序信息 二.获取微信小程序的AppID(一) 什么是小程序AppID&#xff08;二&#xff…

从零开始,无需公网IP,搭建本地电脑上的个人博客网站并发布到公网

文章目录 前言1. 安装套件软件2. 创建网页运行环境 指定网页输出的端口号3. 让WordPress在所需环境中安装并运行 生成网页4. “装修”个人网站5. 将位于本地电脑上的网页发布到公共互联网上 前言 在现代社会&#xff0c;网络已经成为我们生活离不开的必需品&#xff0c;而纷繁…

C#写一个UDP程序判断延迟并运行在Centos上

服务端 using System.Net.Sockets; using System.Net;int serverPort 50001; Socket server; EndPoint client new IPEndPoint(IPAddress.Any, 0);//用来保存发送方的ip和端口号CreateSocket();void CreateSocket() {server new Socket(AddressFamily.InterNetwork, SocketT…

Android 使用OpenCV实现实时人脸识别,并绘制到SurfaceView上

1. 前言 上篇文章 我们已经通过一个简单的例子&#xff0c;在Android Studio中接入了OpenCV。 之前我们也 在Visual Studio上&#xff0c;使用OpenCV实现人脸识别 中实现了人脸识别的效果。 接着&#xff0c;我们就可以将OpenCV的人脸识别效果移植到Android中了。 1.1 环境说…

【Java Web】统一处理异常

一个异常处理的ControllerAdvice类。它用于处理Controller注解的控制器中发生的异常。 具体代码功能如下&#xff1a; 导入相关类和方法。声明一个Logger对象&#xff0c;用于日志记录。使用ExceptionHandler注解标记handleException方法&#xff0c;用于处理所有异常。 -嘛在…