SpringBoot整合rabbitmq-重复消费问题

说明:重复消费的原因大致是生产者将信息A发送到队列中,消费者监听到消息A后开始处理业务,业务处理完成后,监听在告知rabbitmq消息A已经被消费完成途中中断,也就时说我已经处理完业务,而队列中还存在当前消息A,导致消费者服务恢复后又消费到消息A,出现重复操作的业务。

解决思路:我只要有一个地方记录了消息A已经被消费过了【这个消息必须得设置一个唯一标记】,即使消息A再次被消费时,比对一下,如果有记录则说明消息A已经被消费,如果没有说明没有被消费。

我使用redis及设置redis过期时间来解决重复消费问题。

工程图:

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId>  <!-- 被继承的父项目的构件标识符 --><groupId>org.springframework.boot</groupId>  <!-- 被继承的父项目的全球唯一标识符 --><version>2.2.2.RELEASE</version>  <!-- 被继承的父项目的版本 --></parent><groupId>RabbitmqDemoOne</groupId><artifactId>RabbitmqDemoOne</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>RabbitmqDemoOne Maven Webapp</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!--spring boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring boot 测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--开发环境调试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--amqp 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><!-- commons-lang --><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency></dependencies><build><finalName>RabbitmqDemoOne</finalName><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-war-plugin</artifactId><version>3.2.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement></build>
</project>

2.application.yml

server:port: 8080
spring:redis:host: 127.0.0.1port: 6379rabbitmq:port: 5672host: 192.168.199.139username: adminpassword: adminvirtual-host: /

3.RabbitMqConfig

package com.dev.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 李庆伟* @title: RabbitMqConfig* @date 2024/3/3 14:12*/
@Configuration
public class RabbitMqConfig {/*** 队列* @return repeatQueue队列名称 true 持久化*/@Beanpublic Queue makeQueue(){return new Queue("repeatQueue",true);}}

4.RedisTemplateConfig

package com.dev.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @author 李庆伟* @title: RedisTemplateConfig* @date 2024/3/3 14:24*/
@Configuration
public class RedisTemplateConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);// 设置键(key)的序列化采用StringRedisSerializer。redisTemplate.setKeySerializer(new StringRedisSerializer());//redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());//设置值(value)的序列化采用jdk// 设置值(value)的序列化采用FastJsonRedisSerializer。redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.afterPropertiesSet();return redisTemplate;}}

5.RabbitRepeatController

package com.dev.controller;import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @author 李庆伟* @title: RabbitRepeatContoller* @date 2024/3/3 14:05*/
@RestController
@RequestMapping("repeatQueue")
public class RabbitRepeatContoller {@AutowiredRabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法/*** 测试* @return*/@GetMapping("/sendMessage")public String sendMessage() {for (int i = 0; i < 1000; i++) {String id = UUID.randomUUID().toString().replace("-","");Map<String,Object> map = new HashMap<>();map.put("id",id);map.put("name","张龙");map.put("phone","123..11");map.put("num",i);String str = JSONObject.toJSONString(map);Message msg = MessageBuilder.withBody(str.getBytes()).setMessageId(id).build();rabbitTemplate.convertAndSend("", "repeatQueue", msg);}return "ok";}}

6.RabbitMqListener

package com.dev.listener;import com.alibaba.fastjson.JSON;
import com.dev.utils.RedisUtil;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;
import java.util.Map;/*** @author 李庆伟* @title: RabbitMqListener* @date 2024/3/3 14:13*/
@Component
public class RabbitMqListener {@Autowiredprivate RedisUtil redisUtil;@RabbitListener(queues = "repeatQueue")@RabbitHandlerpublic void process(Message msg) throws UnsupportedEncodingException {//获取在发送消息时设置的唯一idString id = msg.getMessageProperties().getMessageId();//去redis中查看是否有记录,如果有证明已经消费过了String val = redisUtil.get(id);if(StringUtils.isNotEmpty(val)){return;}String str = new String(msg.getBody(),"utf-8");if(StringUtils.isNotEmpty(str)){Map<String,Object> map = JSON.parseObject(str,Map.class);System.out.println(map.get("num")+"----"+map.get("id")+"----"+map.get("name")+"----"+map.get("phone"));//将消费过的消息记录到redis中,失效时间为1个小时redisUtil.set(id,id,3600L);System.out.println("----------");}}}

7.RedisUtil

package com.dev.utils;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** @author 李庆伟* @title: RedisUtil* @date 2024/3/3 14:27*/@Component
public class RedisUtil {@Autowiredprivate RedisTemplate redisTemplate;/*** 批量删除对应的value** @param keys*/public void remove(final String... keys) {for (String key : keys) {remove(key);}}/*** 批量删除key** @param pattern*/public void removePattern(final String pattern) {Set<Serializable> keys = redisTemplate.keys(pattern);if (keys.size() > 0)redisTemplate.delete(keys);}/*** 删除对应的value** @param key*/public void remove(final String key) {if (exists(key)) {redisTemplate.delete(key);}}/*** 判断缓存中是否有对应的value** @param key* @return*/public boolean exists(final String key) {return redisTemplate.hasKey(key);}/*** 读取缓存** @param key* @return*/public String get(final String key) {Object result = null;redisTemplate.setValueSerializer(new StringRedisSerializer());ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();result = operations.get(key);if(result==null){return null;}return result.toString();}/*** 写入缓存** @param key* @param value* @return*/public boolean set(final String key, Object value) {boolean result = false;try {ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();operations.set(key, value);result = true;} catch (Exception e) {e.printStackTrace();}return result;}/*** 写入缓存** @param key* @param value* @return*/public boolean set(final String key, Object value, Long expireTime) {boolean result = false;try {ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();operations.set(key, value);redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);result = true;} catch (Exception e) {e.printStackTrace();}return result;}public  boolean hmset(String key, Map<String, String> value) {boolean result = false;try {redisTemplate.opsForHash().putAll(key, value);result = true;} catch (Exception e) {e.printStackTrace();}return result;}public  Map<String,String> hmget(String key) {Map<String,String> result =null;try {result=  redisTemplate.opsForHash().entries(key);} catch (Exception e) {e.printStackTrace();}return result;}/*** 递增** @param key 键* @paramby 要增加几(大于0)* @return*/public long incr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递增因子必须大于0");}return redisTemplate.opsForValue().increment(key, delta);}/*** 递减** @param key 键* @paramby 要减少几(小于0)* @return*/public long decr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递减因子必须大于0");}return redisTemplate.opsForValue().increment(key, -delta);}/*** redis zset可已设置排序(案例,热搜)** @param key 键* @paramby* @return*/public void zadd(String key ,String name) {BoundZSetOperations<Object, Object> boundZSetOperations = redisTemplate.boundZSetOps(key);//自增长后的数据boundZSetOperations.incrementScore(name,1);}}

8.App

package com.dev;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author 李庆伟* @title: App* @date 2024/3/3 14:01*/
@SpringBootApplication
public class App {public static void main(String[] args) {SpringApplication.run(App.class);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/716730.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt|QTreewidget类下函数qt助手详解说明示例(上)

该系列持续更新&#xff0c;喜欢请一键三连&#xff0c;感谢各位大佬。 QT5.14.2 参考官方QT助手 文章目录 QTreeWidget ClasspropertiesPublic Functions默认构造函数默认析构函数添加根节点void addTopLevelItem(QTreeWidgetItem *item)添加多个根节点void addTopLevelItems…

Linux下的权限

1. 操作系统的外壳 在理解Linux权限之前&#xff0c;我们先来吃点小菜。 1.大部分指令都是文件&#xff0c;如果把指令对应的文件删除了&#xff0c;那么这条指令就使用不了了。 2.用户执行某种功能的时候&#xff0c;不是直接让操作系统执行对应的指令的&#xff0c;而是先交…

Python开源项目月排行 2024年2月

Python 趋势月报&#xff0c;按月浏览往期 GitHub,Gitee 等最热门的Python开源项目&#xff0c;入选的项目主要参考GitHub Trending,部分参考了Gitee和其他。排名不分先后&#xff0c;都是当前月份内相对热门的项目。 入选公式&#xff1d;70%GitHub Trending20%Gitee10%其他 …

jvm面试题-背诵版

按照思维导图抽查和记忆&#xff0c;答案见&#xff1a;四、面试-多线程/并发_scheduledfuture释放-CSDN博客

Jmeter系列(4) 线程属性详解

线程属性 线程组是配置压测策略的一个重要环节线程组决定了测试执行的请求数量 线程数 在这里线程数相当于一个虚拟用户每个线程数大约占内存1M特别注意⚠️ 单台机器最大线程数不要超过1000&#xff0c;不然可能会造成内存溢出 Ramp-Up时间 所有线程在多长时间内全部启动…

计算机网络-第2章 物理层

本章内容&#xff1a;物理层和数据通信的概念、传输媒体特点&#xff08;不属于物理层&#xff09;、信道复用、数字传输系统、宽带接入 2.1-2.2 物理层和数据通信的概念 物理层解决的问题&#xff1a;如何在传输媒体上传输数据比特流&#xff0c;屏蔽掉传输媒体和通信手段的差…

文献阅读笔记《Spatial-temporal Forecasting for Regions without Observations》13页

目录 目录 目录 发行刊物 ABSTRACT 1 INTRODUCTION 2 RELATED WORK&#xff08;相关工作 2.1 Spatial-temporal Forecasting&#xff08;时空预测 2.2 Spatial-temporal Forecasting withIncomplete Data&#xff08;不完全数据的时空预测 2.3 Graph Contrastive Lear…

蓝桥杯集训·每日一题2024 (前缀和)

笔记&#xff1a; 例题&#xff1a; #include<bits/stdc.h> using namespace std; const int N 5000010; char str[N]; int s[N]; int main(){int t;cin>>t;for(int a1;a<t;a){int n;cin>>n;scanf("%s",str1);for(int i1;i<n;i){s[i]s[i-1]…

【MySQL】:约束全解析

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; MySQL从入门到进阶 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一. 约束概述二. 约束演示三. 外键约束3.1 介绍3.2 语法3.3 删除/更新行为 &…

Netty的InboundHandler 和OutboundHandler

一、InboundHandler 和OutboundHandler的区别 在Netty中&#xff0c;"inbound"表示来自外部来源&#xff08;如网络连接&#xff09;的数据&#xff0c;而"outbound"则表示从应用程序发送到外部目标&#xff08;如网络连接或其他服务&#xff09;的数据。…

Git——Upload your open store

0.default config ssh-keygen -t rsa #之后一路回车,当前目录.ssh/下产生公私钥 cat ~/.ssh/id_rsa.pub #复制公钥到账号 git config --global user.email account_email git config --global user.name account_name1. 上传一个公开仓库 查看当前分支&#xff1a; git branc…

MATLAB基于隐马尔可夫模型-高斯混合模型-期望最大化的MR图像分割

隐马尔可夫模型是一种统计模型&#xff0c;它描述了马尔可夫过程&#xff0c;隐马尔可夫过程中包含隐变量&#xff0c;语音识别和词性自动标注等一些领域常常使用隐马尔可夫模型方法来处理。马尔可夫过程是一类随机过程&#xff0c;马尔可夫链是它的原始模型&#xff0c;马尔可…

【C++那些事儿】深入理解C++类与对象:从概念到实践(中)| 默认构造函数 | 拷贝构造函数 | 析构函数 | 运算符重载 | const成员函数

&#x1f4f7; 江池俊&#xff1a; 个人主页 &#x1f525;个人专栏&#xff1a; ✅数据结构冒险记 ✅C那些事儿 &#x1f305; 有航道的人&#xff0c;再渺小也不会迷途。 文章目录 1. 类的6个默认成员函数2. 构造函数2.1 概念2.2 特性 3. 析构函数3.1 概念3.2 特性 4. 拷贝…

QT多语言切换功能

一.目的 在做项目时&#xff0c;有时希望我们的程序可以在不同的国家使用&#xff0c;这样最好的方式是一套程序能适应于多国语言。 Qt提供了这样的功能&#xff0c;使得一套程序可以呈现出不同的语言界面。本文将介绍QT如何实现多语言&#xff0c;以中文和英文为例。 QT开发…

过于老旧的pytorch_ssim包 请从github下载源码

有些冷门算法真的不要随便pip&#xff0c;有可能下载到史前版本…最好还是找源代码 汗 今天要用到SSIM损失函数&#xff0c;从网上简单看了一下原理就想测试一下&#xff0c;偷了一下懒就直接在命令行输入pip install pytorch_ssim了&#xff0c;结果报了一堆错误&#xff08;汗…

Mysql实战(1)之环境安装

1&#xff0c;进入&#xff1a;MySQL :: MySQL Downloads 2&#xff0c; 3&#xff0c; 4&#xff0c;

Python算法题集_单词搜索

Python算法题集_单词搜索 题22&#xff1a;单词搜索1. 示例说明2. 题目解析- 题意分解- 优化思路- 测量工具 3. 代码展开1) 标准求解【原始矩阵状态回溯】2) 改进版一【字典检测原始矩阵状态回溯】3) 改进版二【矩阵状态回溯】 4. 最优算法5. 相关资源 本文为Python算法题集之一…

DM数据库学习之路(十九)DM8数据库sysbench部署及压力测试

sysbench部署 安装依赖 yum -y install make automake libtool pkgconfig libaio-devel vim-common 上传sysbench源代码 sysbench_tool.tar 测试是否安装成功 $ /opt/sysbench/sysbench-master-dpi/src/lua $ ./sysbench --version sysbench 1.1.0 sysbench测试DM 测试…

jupyter调用envs环境——jupyter内核配置虚拟环境

1.jupyter无法使用envs环境 pycharm的终端打开jupyter notebook&#xff1a; 在kernel下找不到上面的Pytorch_GPU环境&#xff1a; 2.解决方法 在对应的envs环境中安装ipykernel&#xff1a; 将该环境写入jupyter&#xff1a; python -m ipykernel install --user --name Py…

基于分位数回归的长短期记忆神经网络(QRLSTM)的MATLAB实现(源代码)

分位数回归的长短期神经记忆网络介绍&#xff1a; QRLSTM&#xff08;Quantile Regression Long Short-Term Memory&#xff09;分位数回归神经网络是一种结合了长短期记忆&#xff08;LSTM&#xff09;神经网络和分位数回归的模型。这种神经网络结构旨在对数据的不同分位数进行…