说明:重复消费的原因大致是生产者将信息A发送到队列中,消费者监听到消息A后开始处理业务,业务处理完成后,监听在告知rabbitmq消息A已经被消费完成途中中断,也就时说我已经处理完业务,而队列中还存在当前消息A,导致消费者服务恢复后又消费到消息A,出现重复操作的业务。
解决思路:我只要有一个地方记录了消息A已经被消费过了【这个消息必须得设置一个唯一标记】,即使消息A再次被消费时,比对一下,如果有记录则说明消息A已经被消费,如果没有说明没有被消费。
我使用redis及设置redis过期时间来解决重复消费问题。
工程图:
1.pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId> <!-- 被继承的父项目的构件标识符 --><groupId>org.springframework.boot</groupId> <!-- 被继承的父项目的全球唯一标识符 --><version>2.2.2.RELEASE</version> <!-- 被继承的父项目的版本 --></parent><groupId>RabbitmqDemoOne</groupId><artifactId>RabbitmqDemoOne</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>RabbitmqDemoOne Maven Webapp</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!--spring boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring boot 测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--开发环境调试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--amqp 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><!-- commons-lang --><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency></dependencies><build><finalName>RabbitmqDemoOne</finalName><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-war-plugin</artifactId><version>3.2.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement></build>
</project>
2.application.yml
server:port: 8080
spring:redis:host: 127.0.0.1port: 6379rabbitmq:port: 5672host: 192.168.199.139username: adminpassword: adminvirtual-host: /
3.RabbitMqConfig
package com.dev.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 李庆伟* @title: RabbitMqConfig* @date 2024/3/3 14:12*/
@Configuration
public class RabbitMqConfig {/*** 队列* @return repeatQueue队列名称 true 持久化*/@Beanpublic Queue makeQueue(){return new Queue("repeatQueue",true);}}
4.RedisTemplateConfig
package com.dev.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @author 李庆伟* @title: RedisTemplateConfig* @date 2024/3/3 14:24*/
@Configuration
public class RedisTemplateConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);// 设置键(key)的序列化采用StringRedisSerializer。redisTemplate.setKeySerializer(new StringRedisSerializer());//redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());//设置值(value)的序列化采用jdk// 设置值(value)的序列化采用FastJsonRedisSerializer。redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.afterPropertiesSet();return redisTemplate;}}
5.RabbitRepeatController
package com.dev.controller;import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @author 李庆伟* @title: RabbitRepeatContoller* @date 2024/3/3 14:05*/
@RestController
@RequestMapping("repeatQueue")
public class RabbitRepeatContoller {@AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法/*** 测试* @return*/@GetMapping("/sendMessage")public String sendMessage() {for (int i = 0; i < 1000; i++) {String id = UUID.randomUUID().toString().replace("-","");Map<String,Object> map = new HashMap<>();map.put("id",id);map.put("name","张龙");map.put("phone","123..11");map.put("num",i);String str = JSONObject.toJSONString(map);Message msg = MessageBuilder.withBody(str.getBytes()).setMessageId(id).build();rabbitTemplate.convertAndSend("", "repeatQueue", msg);}return "ok";}}
6.RabbitMqListener
package com.dev.listener;import com.alibaba.fastjson.JSON;
import com.dev.utils.RedisUtil;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;
import java.util.Map;/*** @author 李庆伟* @title: RabbitMqListener* @date 2024/3/3 14:13*/
@Component
public class RabbitMqListener {@Autowiredprivate RedisUtil redisUtil;@RabbitListener(queues = "repeatQueue")@RabbitHandlerpublic void process(Message msg) throws UnsupportedEncodingException {//获取在发送消息时设置的唯一idString id = msg.getMessageProperties().getMessageId();//去redis中查看是否有记录,如果有证明已经消费过了String val = redisUtil.get(id);if(StringUtils.isNotEmpty(val)){return;}String str = new String(msg.getBody(),"utf-8");if(StringUtils.isNotEmpty(str)){Map<String,Object> map = JSON.parseObject(str,Map.class);System.out.println(map.get("num")+"----"+map.get("id")+"----"+map.get("name")+"----"+map.get("phone"));//将消费过的消息记录到redis中,失效时间为1个小时redisUtil.set(id,id,3600L);System.out.println("----------");}}}
7.RedisUtil
package com.dev.utils;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** @author 李庆伟* @title: RedisUtil* @date 2024/3/3 14:27*/@Component
public class RedisUtil {@Autowiredprivate RedisTemplate redisTemplate;/*** 批量删除对应的value** @param keys*/public void remove(final String... keys) {for (String key : keys) {remove(key);}}/*** 批量删除key** @param pattern*/public void removePattern(final String pattern) {Set<Serializable> keys = redisTemplate.keys(pattern);if (keys.size() > 0)redisTemplate.delete(keys);}/*** 删除对应的value** @param key*/public void remove(final String key) {if (exists(key)) {redisTemplate.delete(key);}}/*** 判断缓存中是否有对应的value** @param key* @return*/public boolean exists(final String key) {return redisTemplate.hasKey(key);}/*** 读取缓存** @param key* @return*/public String get(final String key) {Object result = null;redisTemplate.setValueSerializer(new StringRedisSerializer());ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();result = operations.get(key);if(result==null){return null;}return result.toString();}/*** 写入缓存** @param key* @param value* @return*/public boolean set(final String key, Object value) {boolean result = false;try {ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();operations.set(key, value);result = true;} catch (Exception e) {e.printStackTrace();}return result;}/*** 写入缓存** @param key* @param value* @return*/public boolean set(final String key, Object value, Long expireTime) {boolean result = false;try {ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();operations.set(key, value);redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);result = true;} catch (Exception e) {e.printStackTrace();}return result;}public boolean hmset(String key, Map<String, String> value) {boolean result = false;try {redisTemplate.opsForHash().putAll(key, value);result = true;} catch (Exception e) {e.printStackTrace();}return result;}public Map<String,String> hmget(String key) {Map<String,String> result =null;try {result= redisTemplate.opsForHash().entries(key);} catch (Exception e) {e.printStackTrace();}return result;}/*** 递增** @param key 键* @paramby 要增加几(大于0)* @return*/public long incr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递增因子必须大于0");}return redisTemplate.opsForValue().increment(key, delta);}/*** 递减** @param key 键* @paramby 要减少几(小于0)* @return*/public long decr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递减因子必须大于0");}return redisTemplate.opsForValue().increment(key, -delta);}/*** redis zset可已设置排序(案例,热搜)** @param key 键* @paramby* @return*/public void zadd(String key ,String name) {BoundZSetOperations<Object, Object> boundZSetOperations = redisTemplate.boundZSetOps(key);//自增长后的数据boundZSetOperations.incrementScore(name,1);}}
8.App
package com.dev;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author 李庆伟* @title: App* @date 2024/3/3 14:01*/
@SpringBootApplication
public class App {public static void main(String[] args) {SpringApplication.run(App.class);}
}