【RabbitMQ实战】邮件发送(直连交换机、手动ack)

一、实现思路

二、异常情况测试现象及解决

在这里插入图片描述

说明:本文涵盖了关于RabbitMQ很多方面的知识点, 如:
消息发送确认机制 、消费确认机制 、消息的重新投递 、消费幂等性,

二、实现思路
1.简略介绍163邮箱授权码的获取
2.编写发送邮件工具类
3.编写RabbitMQ配置文件
4.生产者发起调用
5.消费者发送邮件
6.定时任务定时拉取投递失败的消息, 重新投递
7.各种异常情况的测试验证
8.拓展: 使用动态代理实现消费端幂等性验证和消息确认(ack)

三、 代码实现

配置版本如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.atguigu.gulimall</groupId><artifactId>provider-and-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>provider-and-consumer</name><description>Demo project for Spring Boot</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>1.8</java.version><!--        <spring-cloud.version>2021.0.4</spring-cloud.version>--><spring-cloud.version>2021.0.1</spring-cloud.version></properties><dependencies><!--joda time  ? 这个还有些问题,这个类库是做什么的--><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.10</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><dependency><groupId>com.atguigu.gulimall</groupId><artifactId>gulimall-common</artifactId><version>0.0.1-SNAPSHOT</version><exclusions><exclusion><artifactId>servlet-api</artifactId><groupId>javax.servlet</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><!--什么作用? --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-collections4</artifactId><version>4.2</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.4.2</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins><resources><resource><directory>src/main/java</directory><!--所在的目录--><includes><!--包括目录下的.properties,.xml文件都会扫描到--><include>**/*.properties</include><include>**/*.xml</include></includes><filtering>false</filtering></resource></resources></build></project>

完整代码可以参考我的GitHub, https://gitee.com/zhai_jiahao/gulimall

代码实现
1.163邮箱授权码的获取, 如图:
在这里插入图片描述
每次启用授权码的时候,就会出现一行字符串,其实就是三方发送邮件的时候,使用的密码(该授权码就是配置文件spring.mail.password需要的密码)

项目结构
在这里插入图片描述

1、rabbitmq、邮箱配置:

server:port: 8023#数据源配置
spring:datasource:url: jdbc:mysql://192.168.56.10:3306/gulimall_umsusername: rootpassword: rootdriver-class-name:  com.mysql.cj.jdbc.Driver#配置nacoscloud:nacos:discovery:server-addr: 127.0.0.1:8848#配置服务名称application:name: provider-and-consumer# 配置rabbitMq 服务器#spring.application.name=rabbitmq-consumer-truerabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虚拟host 可以不设置,使用server默认hostvirtual-host: /publisher-returns: true  #确认消息已发送到队列(Queue)  这个在生产者模块配置 这个后期再配置,这会还用不到publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange) 这个在生产者模块配置 这个后期再配置,这会还用不到listener:  #这个在测试消费多个消息的时候,不能有下面这些配置,否则只能消费一个消息后就不继续消费了simple:acknowledge-mode: manual  #指定MQ消费者的确认模式是手动确认模式  这个在消费者者模块配置  设置手动确认(ack)prefetch: 1 #一次只能消费一条消息   这个在消费者者模块配置#配置mailmail:host: smtp.163.comusername: 15131650119@163.comfrom: 15131650119@163.compassword: GTMCFUFBTNZERDJAdefault-encoding: UTF-8properties:mail:stmp:auth: truestarttls:enable: truerequired: true#配置日志输出级别
logging:level:com.atguigu.gulimall: debug   #level 日志等级 指定命名空间的日志输出pattern:console: "%d %-5level %logger : %msg%n"file: "%d %-5level [%thread] %logger : %msg%n"file:name: d://spring/log

说明: password即授权码, username和from要一致

2、表结构

CREATE TABLE `msg_log` (`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',`msg` text COMMENT '消息体, json格式化',`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',`next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`update_time` datetime DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`msg_id`),UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';select * from msg_log t order by t.create_time  desc;

说明: exchange routing_key字段是在定时任务重新投递消息时需要用到的

后面会用到的sql(设置时区使用)

#查询需要定时任务处理的数据
select msg_id, msg, exchange, routing_key, status, try_count,
next_try_time, create_time, update_time,SYSDATE(), now()  from msg_log where status = 0 and next_try_time <= now() #设置时区
SELECT @@global.time_zone;
SET GLOBAL time_zone = 'Asia/Shanghai';

3、启动类、服务接口、服务接口实现类

启动类ProviderAndConsumerApplication

package com.atguigu.gulimall.providerconsumer;import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableScheduling;/*** MQ消息发送邮件功能实战(博客地址:https://blog.csdn.net/onceing/article/details/126407845)*/@EnableScheduling   //设置能使用定时任务
@EnableDiscoveryClient
@SpringBootApplication
@MapperScan("com.atguigu.gulimall.providerconsumer.mapper")
public class ProviderAndConsumerApplication {public static void main(String[] args) {SpringApplication.run(ProviderAndConsumerApplication.class, args);}}

4、TestController 向队列中入消息的入口

	package com.atguigu.gulimall.providerconsumer.controller;import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.Errors;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/**** 测试入库控制器类* @author: jd* @create: 2024-06-28*/@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {@Autowiredprivate TestService testService;/*** 发送邮件* @param mail 邮件对象* @param errors JSR303验证结果错误对象  ,(猜测是可以拿到验证的错误信息的用于返回校验的提示)* @return*/@PostMapping("/send")public ServerResponse sendMail(@RequestBody @Validated Mail mail, Errors errors){if(errors.hasErrors()){String defaultMessage = errors.getFieldError().getDefaultMessage();return ServerResponse.error(defaultMessage);}return testService.send(mail);}}

5、消息生产接口 TestService.java

package com.atguigu.gulimall.providerconsumer.service;import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;/*** 消息生产接口*/
public interface TestService {ServerResponse testIdempotence();ServerResponse accessLimit();ServerResponse send(Mail mail);}

TestServiceImpl.java

package com.atguigu.gulimall.providerconsumer.service.impl;import com.atguigu.gulimall.providerconsumer.common.ResponseCode;
import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;/*** 消息生产接口实现类* @author: jd* @create: 2024-06-27*/
@Service
@Slf4j
public class TestServiceImpl  implements TestService {@Autowiredprivate MsgLogMapper msgLogMapper;@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic ServerResponse testIdempotence() {return ServerResponse.success("testIdempotence: success");}@Overridepublic ServerResponse accessLimit() {return ServerResponse.success("accessLimit: success");}@Overridepublic ServerResponse send(Mail mail) {// 1. 生产唯一业务标识String msgId = String.valueOf(UUID.randomUUID());  //业务的唯一标识mail.setMsgId(msgId);//2.记录日志MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);msgLogMapper.insertMsgLog(msgLog);// 消息入库  先记录日志//3.真正发送消息到MQ中CorrelationData correlationData = new CorrelationData(msgId);rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME,MessageHelper.objToMsg(mail), correlationData);// 发送消息log.info("====================>消息已发送队列");//返回公共的响应结果return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());}
}

MsgLogMapper.java

package com.atguigu.gulimall.providerconsumer.mapper;import com.atguigu.gulimall.providerconsumer.batch.BatchProcessMapper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import org.apache.ibatis.annotations.Mapper;import java.util.List;/*** 日志操作mapper接口*/
@Mapper
public interface MsgLogMapper  extends BatchProcessMapper<MsgLog> {/*** 记录消息日志* @param msgLog*/void insertMsgLog(MsgLog msgLog);/*** 更新消息日志状态* @param msgLog*/void updateStatus(MsgLog msgLog);/*** 查询超时消息* @return*/List<MsgLog> selectTimeoutMsg();/*** 更新尝试的次数* @param msgLog*/void updateTryCount(MsgLog msgLog);/*** 通过主键筛选出消息日志对象* @param msgId* @return*/MsgLog selectByPrimaryKey(String msgId);}

MsgLogMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper" ><resultMap id="BaseResultMap" type="com.atguigu.gulimall.providerconsumer.pojo.MsgLog" ><id column="msg_id" property="msgId" jdbcType="VARCHAR" /><result column="msg" property="msg" jdbcType="VARCHAR" /><result column="exchange" property="exchange" jdbcType="VARCHAR" /><result column="routing_key" property="routingKey" jdbcType="VARCHAR" /><result column="status" property="status" jdbcType="INTEGER" /><result column="try_count" property="tryCount" jdbcType="INTEGER" /><result column="next_try_time" property="nextTryTime" jdbcType="TIMESTAMP" /><result column="create_time" property="createTime" jdbcType="TIMESTAMP" /><result column="update_time" property="updateTime" jdbcType="TIMESTAMP" /></resultMap><sql id="Base_Column_List" >msg_id, msg, exchange, routing_key, status, try_count, next_try_time, create_time, update_time</sql><insert id="insertMsgLog" parameterType="com.atguigu.gulimall.providerconsumer.pojo.MsgLog">INSERT INTO msg_log(msg_id, msg, exchange, routing_key, status, try_count, next_try_time, create_time, update_time)VALUES (#{msgId}, #{msg}, #{exchange}, #{routingKey}, #{status}, #{tryCount}, #{nextTryTime}, #{createTime}, #{updateTime})</insert><update id="updateStatus" parameterType="com.atguigu.gulimall.providerconsumer.pojo.MsgLog">update msg_log set status = #{status}, update_time = now()where msg_id = #{msgId}</update><select id="selectTimeoutMsg" resultMap="BaseResultMap">select <include refid="Base_Column_List"/>from msg_logwhere status = 0and next_try_time &lt;= now()</select><update id="updateTryCount">update msg_log set try_count = try_count + 1, next_try_time = #{nextTryTime}, update_time = now()where msg_id = #{msgId}</update><select id="selectByPrimaryKey" parameterType="java.lang.String" resultMap="BaseResultMap">select<include refid="Base_Column_List" />from msg_logwhere msg_id = #{msgId,jdbcType=VARCHAR}</select>
</mapper>

MsgLogService.java

package com.atguigu.gulimall.providerconsumer.service;import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;import java.util.Date;
import java.util.List;/*** 日志记录接口类*/
public interface MsgLogService {void updateStatus(String msgId, Integer status);MsgLog selectByMsgId(String msgId);List<MsgLog> selectTimeoutMsg();void updateTryCount(String msgId, Date tryTime);
}

MsgLogServiceImpl.java 消息日志操作实现类

package com.atguigu.gulimall.providerconsumer.service.impl;import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import com.atguigu.gulimall.providerconsumer.util.JodaTimeUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;
import java.util.List;/*** 消息日志操作实现类* @author: jd* @create: 2024-06-27*/
@Service
public class MsgLogServiceImpl implements MsgLogService {@Autowiredprivate MsgLogMapper msgLogMapper;@Overridepublic void updateStatus(String msgId, Integer status) {MsgLog msgLog = new MsgLog();msgLog.setMsgId(msgId);msgLog.setStatus(status);msgLog.setUpdateTime(new Date());msgLogMapper.updateStatus(msgLog);}@Overridepublic MsgLog selectByMsgId(String msgId) {return msgLogMapper.selectByPrimaryKey(msgId);}@Overridepublic List<MsgLog> selectTimeoutMsg() {return msgLogMapper.selectTimeoutMsg();}@Overridepublic void updateTryCount(String msgId, Date tryTime) {//获取下一次重发发送时间,上一次发送时间 加一分钟Date nextTryTime = JodaTimeUtil.plusMinutes(tryTime, 1);//构建消息对象MsgLog msgLog = new MsgLog();msgLog.setMsgId(msgId);msgLog.setNextTryTime(nextTryTime);  //设置下一次消息重发时间msgLogMapper.updateTryCount(msgLog);}
}

通用BatchProcessMapper.java 所有的mapper可以继承的

package com.atguigu.gulimall.providerconsumer.batch;import java.util.List;/*** 通用manpper接口* @param <T>*/
public interface BatchProcessMapper<T> {void batchInsert(List<T> list);void batchUpdate(List<T> list);
}

通用manpper接口实现类 MapperProxy

package com.atguigu.gulimall.providerconsumer.batch.mapperproxy;import com.atguigu.gulimall.providerconsumer.batch.BatchProcessMapper;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;import java.util.List;import static com.atguigu.gulimall.providerconsumer.common.Constant.MAX_SIZE_PER_TIME;/*** 通用manpper接口实现类* @author: jd* @create: 2024-06-27*/
public class MapperProxy<T> implements BatchProcessMapper<T> {private BatchProcessMapper batchProcessMapper;public MapperProxy(BatchProcessMapper batchProcessMapper) {this.batchProcessMapper = batchProcessMapper;}@Overridepublic void batchInsert(List<T> list) {if (CollectionUtils.isEmpty(list)) {return;}List<List<T>> partition = Lists.partition(list, MAX_SIZE_PER_TIME);for (List<T> batchList : partition) {batchProcessMapper.batchInsert(batchList);}}@Overridepublic void batchUpdate(List<T> list) {if (CollectionUtils.isEmpty(list)) {return;}List<List<T>> partition = Lists.partition(list, MAX_SIZE_PER_TIME);for (List<T> batchList : partition) {batchProcessMapper.batchUpdate(batchList);}}}

常量类 Constant.java

package com.atguigu.gulimall.providerconsumer.common;import java.util.Arrays;
import java.util.stream.Collectors;/*** 常量 、枚举类* @author: jd* @create: 2024-06-27*/
public class Constant {public static final int MAX_SIZE_PER_TIME = 1000;public static final int INDEX_ZERO = 0;public static final int INDEX_ONE = 1;public static final int INDEX_TWO = 2;public static final int INDEX_THREE = 3;public static final int NUMBER_ZERO = 0;public static final int NUMBER_ONE = 1;public static final String COLON = ":";public static final String COMMA = ",";public static final String DOUBLE_STRIGULA = "--";public static final String REPLACEMENT_TARGET = "-99999%";public static final String UNKNOWN_TYPE = "未知类型";public interface Redis {String OK = "OK";// 过期时间, 60s, 一分钟Integer EXPIRE_TIME_MINUTE = 60;// 过期时间, 一小时Integer EXPIRE_TIME_HOUR = 60 * 60;// 过期时间, 一天Integer EXPIRE_TIME_DAY = 60 * 60 * 24;String TOKEN_PREFIX = "token:";String MSG_CONSUMER_PREFIX = "consumer:";String ACCESS_LIMIT_PREFIX = "accessLimit:";String FUND_RANK = "fundRank";String FUND_LIST = "fundList";}public interface LogType {// 登录Integer LOGIN = 1;// 登出Integer LOGOUT = 2;}/*** 相较于生产者对消息的角度来设置的此项枚举值*/public interface MsgLogStatus {// 消息投递中Integer DELIVERING = 0;// 投递成功Integer DELIVER_SUCCESS = 1;// 投递失败Integer DELIVER_FAIL = 2;// 已消费Integer CONSUMED_SUCCESS = 3;}public enum CalculateTypeEnum {ADD(1, "加"),SUBTRACT(2, "减"),MULTIPLY(3, "乘"),DIVIDE(4, "除");Integer type;String desc;CalculateTypeEnum(Integer type, String desc) {this.type = type;this.desc = desc;}public Integer getType() {return type;}public String getDesc() {return desc;}}public enum FundSortType {ASC("asc"),DESC("desc"),;private String type;FundSortType(String type) {this.type = type;}public String getType() {return type;}}
}

公共服务响应包装类【这个一般的项目中都会用到这个公共的封装】ServerResponse.java

package com.atguigu.gulimall.providerconsumer.common;import com.fasterxml.jackson.annotation.JsonIgnore;
import jdk.nashorn.internal.ir.annotations.Ignore;import java.io.Serializable;/*** 公共服务响应包装类【这个一般的项目中都会用到这个公共的封装】* @author: jd* @create: 2024-06-27*/
public class ServerResponse  implements Serializable {private static final long serialVersionUID = 7498483649536881777L;private Integer status;private String msg;private Object data;public ServerResponse() {}public ServerResponse(Integer status, String msg, Object data) {this.status = status;this.msg = msg;this.data = data;}/*** @JsonIgnore注解在Java中主要用于处理JSON序列化和反序列化过程,其具体作用如下:** 忽略属性:当在Java对象的某个属性或方法上使用@JsonIgnore注解时,该属性或方法对应的属性在序列化为JSON字符串时会被忽略,同样地,在将JSON字符串反序列化为Java对象时,该属性或方法对应的属性也不会被解析。* 当用在属性上时:表示忽略该属性的序列化和反序列化。* 当用在方法上时:表示忽略该方法对应的属性的序列化和反序列化。* 保护敏感信息:在实际应用中,@JsonIgnore注解可以用于隐藏一些敏感信息,比如密码、token等,确保这些信息不会被发送到客户端或存储在不安全的地方。* 减少数据大小:通过忽略一些不必要的属性,可以减少序列化后的JSON数据大小,提高数据传输效率。* 解决循环引用问题:当对象之间存在循环引用时,使用@JsonIgnore注解可以避免在序列化过程中出现无限递归的情况。* 提高程序的可维护性和安全性:通过精确控制哪些属性参与序列化和反序列化,可以使得程序更加健壮,减少潜在的安全风险。* 需要注意的是,@JsonIgnore注解是Jackson库提供的,因此需要确保项目中引入了Jackson库的相关依赖。同时,在使用@JsonIgnore注解时要确保被标记的属性或方法确实不需要参与序列化和反序列化,否则可能会导致意外的结果。** 总之,@JsonIgnore注解在Java对象和JSON之间的转换过程中起到了非常重要的作用,能够帮助我们更灵活地控制序列化和反序列化的行为。* @return*/@JsonIgnorepublic boolean isSuccess() {return this.status == ResponseCode.SUCCESS.getCode();}public static ServerResponse success() {return new ServerResponse(ResponseCode.SUCCESS.getCode(), null, null);}public static ServerResponse success(String msg) {return new ServerResponse(ResponseCode.SUCCESS.getCode(), msg, null);}public static ServerResponse success(Object data) {return new ServerResponse(ResponseCode.SUCCESS.getCode(), null, data);}public static ServerResponse success(String msg, Object data) {return new ServerResponse(ResponseCode.SUCCESS.getCode(), msg, data);}public static ServerResponse error(String msg) {return new ServerResponse(ResponseCode.ERROR.getCode(), msg, null);}public static ServerResponse error(Object data) {return new ServerResponse(ResponseCode.ERROR.getCode(), null, data);}public static ServerResponse error(String msg, Object data) {return new ServerResponse(ResponseCode.ERROR.getCode(), msg, data);}public Integer getStatus() {return status;}public void setStatus(Integer status) {this.status = status;}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg = msg;}public Object getData() {return data;}public void setData(Object data) {this.data = data;}
}

服务响应状态码 大部分的服务中都会用到这个公共的状态码类 ResponseCode.java

package com.atguigu.gulimall.providerconsumer.common;/*** 服务响应状态码  大部分的服务中都会用到这个公共的状态码类*/
public enum ResponseCode {// 系统模块SUCCESS(0, "操作成功"),ERROR(1, "操作失败"),SERVER_ERROR(500, "服务器异常"),// 通用模块 1xxxxILLEGAL_ARGUMENT(10000, "参数不合法"),REPETITIVE_OPERATION(10001, "请勿重复操作"),ACCESS_LIMIT(10002, "请求太频繁, 请稍后再试"),MAIL_SEND_SUCCESS(10003, "邮件发送成功"),// 用户模块 2xxxxNEED_LOGIN(20001, "登录失效"),USERNAME_OR_PASSWORD_EMPTY(20002, "用户名或密码不能为空"),USERNAME_OR_PASSWORD_WRONG(20003, "用户名或密码错误"),USER_NOT_EXISTS(20004, "用户不存在"),WRONG_PASSWORD(20005, "密码错误"),;private Integer code;private String msg;ResponseCode(Integer code, String msg) {this.code = code;this.msg = msg;}public Integer getCode() {return code;}public void setCode(Integer code) {this.code = code;}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg = msg;}
}

4、工具类

时间字符操作类 JodaTimeUtil.java

package com.atguigu.gulimall.providerconsumer.util;import com.alibaba.cloud.commons.lang.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import java.util.Date;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/*** 时间字符操作类 JodaTimeUtil* @author: jd* @create: 2024-06-27*/
@Slf4j
public class JodaTimeUtil {private static final String STANDARD_FORMAT = "yyyy-MM-dd HH:mm:ss";/*** date类型 -> string类型** @param date* @return*/public static String dateToStr(Date date) {return dateToStr(date, STANDARD_FORMAT);}/*** date类型 -> string类型** @param date* @param format 自定义日期格式* @return*/public static String dateToStr(Date date, String format) {if (date == null) {return null;}format = StringUtils.isBlank(format) ? STANDARD_FORMAT : format;DateTime dateTime = new DateTime(date);return dateTime.toString(format);}/*** string类型 -> date类型** @param timeStr* @return*/public static Date strToDate(String timeStr) {return strToDate(timeStr, STANDARD_FORMAT);}/*** string类型 -> date类型** @param timeStr* @param format  自定义日期格式* @return*/public static Date strToDate(String timeStr, String format) {if (StringUtils.isBlank(timeStr)) {return null;}format = StringUtils.isBlank(format) ? STANDARD_FORMAT : format;DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(format);DateTime dateTime;try {dateTime = dateTimeFormatter.parseDateTime(timeStr);} catch (Exception e) {log.error("strToDate error: timeStr: {}", timeStr, e);return null;}return dateTime.toDate();}/*** 判断date日期是否过期(与当前时刻比较)** @param date* @return*/public static Boolean isTimeExpired(Date date) {String timeStr = dateToStr(date);return isBeforeNow(timeStr);}/*** 判断date日期是否过期(与当前时刻比较)** @param timeStr* @return*/public static Boolean isTimeExpired(String timeStr) {if (StringUtils.isBlank(timeStr)) {return true;}return isBeforeNow(timeStr);}/*** 判断timeStr是否在当前时刻之前** @param timeStr* @return*/private static Boolean isBeforeNow(String timeStr) {DateTimeFormatter format = DateTimeFormat.forPattern(STANDARD_FORMAT);DateTime dateTime;try {dateTime = DateTime.parse(timeStr, format);} catch (Exception e) {log.error("isBeforeNow error: timeStr: {}", timeStr, e);return null;}return dateTime.isBeforeNow();}/*** 日期加天数** @param date* @param days* @return*/public static Date plusDays(Date date, int days) {return plusOrMinusDays(date, days, 0);}/*** 日期减天数** @param date* @param days* @return*/public static Date minusDays(Date date, int days) {return plusOrMinusDays(date, days, 1);}/*** 加减天数** @param date* @param days* @param type 0:加天数 1:减天数* @return*/private static Date plusOrMinusDays(Date date, int days, Integer type) {if (null == date) {return null;}DateTime dateTime = new DateTime(date);if (type == 0) {dateTime = dateTime.plusDays(days);} else {dateTime = dateTime.minusDays(days);}return dateTime.toDate();}/*** 日期加分钟** @param date* @param minutes* @return*/public static Date plusMinutes(Date date, int minutes) {return plusOrMinusMinutes(date, minutes, 0);}/*** 日期减分钟** @param date* @param minutes* @return*/public static Date minusMinutes(Date date, int minutes) {return plusOrMinusMinutes(date, minutes, 1);}/*** 加减分钟** @param date* @param minutes* @param type    0:加分钟 1:减分钟* @return*/private static Date plusOrMinusMinutes(Date date, int minutes, Integer type) {if (null == date) {return null;}DateTime dateTime = new DateTime(date);if (type == 0) {dateTime = dateTime.plusMinutes(minutes);} else {dateTime = dateTime.minusMinutes(minutes);}return dateTime.toDate();}/*** 日期加月份** @param date* @param months* @return*/public static Date plusMonths(Date date, int months) {return plusOrMinusMonths(date, months, 0);}/*** 日期减月份** @param date* @param months* @return*/public static Date minusMonths(Date date, int months) {return plusOrMinusMonths(date, months, 1);}/*** 加减月份** @param date* @param months* @param type   0:加月份 1:减月份* @return*/private static Date plusOrMinusMonths(Date date, int months, Integer type) {if (null == date) {return null;}DateTime dateTime = new DateTime(date);if (type == 0) {dateTime = dateTime.plusMonths(months);} else {dateTime = dateTime.minusMonths(months);}return dateTime.toDate();}/*** 判断target是否在开始和结束时间之间** @param target* @param startTime* @param endTime* @return*/public static Boolean isBetweenStartAndEndTime(Date target, Date startTime, Date endTime) {if (null == target || null == startTime || null == endTime) {return false;}DateTime dateTime = new DateTime(target);return dateTime.isAfter(startTime.getTime()) && dateTime.isBefore(endTime.getTime());}
}

Object 和String互转类 JsonUtil

package com.atguigu.gulimall.providerconsumer.util;import com.alibaba.cloud.commons.lang.StringUtils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.extern.slf4j.Slf4j;import java.text.SimpleDateFormat;/*** Object 和String互转类* @author: jd* @create: 2024-06-27*/
@Slf4j
public class JsonUtil {private static ObjectMapper objectMapper = new ObjectMapper();private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";static {// 对象的所有字段全部列入objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);// 取消默认转换timestamps形式objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);// 忽略空bean转json的错误objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);// 统一日期格式objectMapper.setDateFormat(new SimpleDateFormat(DATE_FORMAT));// 忽略在json字符串中存在, 但在java对象中不存在对应属性的情况, 防止错误objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);}/*** 将Object转化为String对象* @param obj* @param <T>* @return*/public static <T> String objToStr(T obj) {if (null == obj) {return null;}try {return obj instanceof String ? (String) obj : objectMapper.writeValueAsString(obj);} catch (Exception e) {log.warn("objToStr error: ", e);return null;}}/*** 将字符串转化成Object对象* @param str   待转的字符串* @param clazz 类名* @param <T>* @return*/public static <T> T strToObj(String str, Class<T> clazz) {if (StringUtils.isBlank(str) || null == clazz) {return null;}try {return clazz.equals(String.class) ? (T) str : objectMapper.readValue(str, clazz);} catch (Exception e) {log.warn("strToObj error: ", e);return null;}}public static <T> T strToObj(String str, TypeReference<T> typeReference) {if (StringUtils.isBlank(str) || null == typeReference) {return null;}try {return (T) (typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference));} catch (Exception e) {log.error("strToObj error", e);return null;}}
}

发送邮件工具类 MailUtil.java

package com.atguigu.gulimall.providerconsumer.util;import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.MailException;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress;/**** 发送邮件工具类* @author: jd* @create: 2024-06-27*/@Component
@Slf4j
public class MailUtil {@Value("${spring.mail.from}")    //这里从application.xml中拿不到配置信息,所以从这里直接写死了private String from ="15131650119@163.com";@Autowiredprivate JavaMailSender mailSender;public boolean send(Mail mail) throws AddressException {//模拟消费成功,但是业务实际没成功,此时会重新入队列,不会造成消息丢失
//        if(true){
//            return false;
//        }String to = mail.getTo();// 目标邮箱String title = mail.getTitle();// 邮件标题String content = mail.getContent();// 邮件正文SimpleMailMessage message = new SimpleMailMessage();message.setFrom(String.valueOf(new InternetAddress(from)));  //设置发送人message.setTo(to);  //设置目标账户message.setSubject(title); //设置邮件标题message.setText(content);  //设置邮件内容try {log.info("===================>开始发送邮件");mailSender.send(message);log.info("===================>邮件发送成功");return true;} catch (MailException e) {log.error("=============>邮件发送失败, to: {}, title: {}", to, title, e);return false;}}}

SpringBeanUtil.java 获取BeanSpring容器类

package com.atguigu.gulimall.providerconsumer.util;import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** @author: jd* @create: 2024-06-27*/
@Component
public class SpringBeanUtil implements ApplicationContextAware {private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext)throws BeansException {SpringBeanUtil.applicationContext = applicationContext;}/*** 通过名称在spring容器中获取对象** @param beanName* @return*/public static Object getBean(String beanName) {System.out.println(applicationContext);return applicationContext.getBean(beanName);}}

5、RabbitMQ消费者、生产者配置类

A、MQ生产者:

TestController.java

package com.atguigu.gulimall.providerconsumer.service.impl;import com.atguigu.gulimall.providerconsumer.common.ResponseCode;
import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;/*** 消息生产接口实现类* @author: jd* @create: 2024-06-27*/
@Service
@Slf4j
public class TestServiceImpl  implements TestService {@Autowiredprivate MsgLogMapper msgLogMapper;@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic ServerResponse testIdempotence() {return ServerResponse.success("testIdempotence: success");}@Overridepublic ServerResponse accessLimit() {return ServerResponse.success("accessLimit: success");}@Overridepublic ServerResponse send(Mail mail) {// 1. 生产唯一业务标识String msgId = String.valueOf(UUID.randomUUID());  //业务的唯一标识mail.setMsgId(msgId);//2.记录日志MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);msgLogMapper.insertMsgLog(msgLog);// 消息入库  先记录日志//3.真正发送消息到MQ中CorrelationData correlationData = new CorrelationData(msgId);rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME,MessageHelper.objToMsg(mail), correlationData);// 发送消息log.info("====================>消息已发送队列");//返回公共的响应结果return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());}
}

队列 交换机配置,用于消息生产者:RabbitConfig.java

package com.atguigu.gulimall.providerconsumer.config;import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;/**** 队列 交换机配置,用于消息生产者* @author: jd* @create: 2024-06-27*/@Slf4j
@Component
@Configuration
public class RabbitConfig {@Autowiredprivate MsgLogService msgLogService;// 发送邮件public static final String MAIL_QUEUE_NAME = "mail.queue";public static final String MAIL_EXCHANGE_NAME = "mail.exchange";public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";@Beanpublic Queue mailQueue() {return new Queue(MAIL_QUEUE_NAME, true);}@Beanpublic DirectExchange mailExchange() {return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);}@Beanpublic Binding mailBinding() {return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);}//    @Autowired
//    private CachingConnectionFactory connectionFactory;//    ConnectionFactory connectionFactory = (ConnectionFactory) SpringBeanUtil.getBean("connectionFactory");/*** 设置生产者消息确认回调函数**/@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory  connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setMessageConverter(converter());rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息成功发送到Exchange");String msgId = correlationData.getId();msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);} else {log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);}System.out.println("ConfirmCallback回调:     "+"相关数据:"+correlationData);System.out.println("ConfirmCallback回调:     "+"确认情况:"+ack);System.out.println("ConfirmCallback回调:     "+"原因:"+cause);}});rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("ReturnCallback回调:     "+"消息:"+returnedMessage.getMessage());System.out.println("ReturnCallback回调:     "+"回应码:"+returnedMessage.getReplyCode());System.out.println("ReturnCallback回调:     "+"回应信息:"+returnedMessage.getReplyText());System.out.println("ReturnCallback回调:     "+"交换机:"+returnedMessage.getExchange());System.out.println("ReturnCallback回调:     "+"路由键:"+returnedMessage.getRoutingKey());log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}",returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getReplyCode(),returnedMessage.getReplyText(),returnedMessage.getMessage());}});return rabbitTemplate;}@Beanpublic Jackson2JsonMessageConverter converter() {return new Jackson2JsonMessageConverter();}}
B、MQ 消费者 其实就完成了3件事: 1.保证消费幂等性, 2.发送邮件, 3.更新消息状态, 手动ack
package com.atguigu.gulimall.providerconsumer.mq.consumer;import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import com.atguigu.gulimall.providerconsumer.util.JsonUtil;
import com.atguigu.gulimall.providerconsumer.util.MailUtil;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.mail.internet.AddressException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;/*** MQ 监听者,操作业务(发送邮件)* 其实就完成了3件事:*      1.保证消费幂等性, 2.发送邮件, 3.更新消息状态, 手动ack* @author: jd* @create: 2024-06-27*/
@Component
@Slf4j
@RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)  //指定监听队列
public class MailConsumer {@Autowiredprivate MsgLogService msgLogService;@Autowiredprivate MailUtil mailUtil;@RabbitHandler(isDefault = true)   //指定监听后的处理动作public void consume(Message message, Channel channel) throws IOException, AddressException {//将Message中的业务数据转化成Mail对象Mail mail = MessageHelper.msgToObj(message, Mail.class);log.info("================>消费者收到消息: {}", mail.toString());log.debug("=========测试debug和info有什么区别======");//根据ID查询Msg对象String msgId = mail.getMsgId();MsgLog msgLog = msgLogService.selectByMsgId(msgId);// 消费幂等性if (null == msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) {log.info("===========>消费者重复消费,此时不进行消费 ,msgId: {}", msgId);//直接终止程序运行,程序返回return;}//拿到MQ中的每一条消息的唯一标识TagMessageProperties properties = message.getMessageProperties();long tag = properties.getDeliveryTag();//业务操作:发送邮件log.info("================>准备发送邮件");boolean send = mailUtil.send(mail);
//try {//如果发送邮件成功,则修改消息状态为 已消费if(send){//发送成功后更新消息日志表的消息记录状态msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS);//取得进程IDThread t = Thread.currentThread();log.info("【消息队列】current request consumer success, request info: {}; thread info: {};", JsonUtil.objToStr(mail), t);// 消费确认,设置反馈给MQchannel.basicAck(tag, false);}else {log.error("【消息队列】consumer failed,, msg info: {}", JsonUtil.objToStr(mail));channel.basicNack(tag, false, true);  //这样会告诉rabbitmq该消息消费失败, 需要重新入队, 可以重新投递到其他正常的消费端进行消费, 从而保证消息不被丢失}} catch (Exception e) {//产生异常之后,则不消费,直接拒绝此消息,不进行消费;这样会导致这条失败的消息会一直存在队列里面,然后定时任务过一会在数据库中扫到这个信息之后,会再去MQ中拿这个消息进行消费e.printStackTrace();ByteArrayOutputStream bass = new ByteArrayOutputStream();e.printStackTrace(new PrintStream(bass));log.error("【消息队列】consumer error, error info: {}, msg info: {}", bass, JsonUtil.objToStr(mail));channel.basicNack(tag, false, true);}}}

6、定时任务重发: ResendMsg.java (说明: 每一条消息都和exchange routingKey绑定, 所有消息重投共用这一个定时任务即可)

package com.atguigu.gulimall.providerconsumer.task;import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.List;/*** 消息重发定时任务* @author: jd* @create: 2024-06-28*/
@Component
@Slf4j
public class ResendMsg {@Autowiredprivate RabbitTemplate rabbitTemplate;// 最大投递次数。第四次投递失败private static final int MAX_TRY_COUNT = 3;@Autowiredprivate MsgLogService msgLogService;/*** 每30s拉取投递失败的消息, 重新投递*/@Scheduled(cron = "0/30 * * * * ?")public void reSend(){log.info("开始执行定时任务(重新投递消息)");List<MsgLog> msgLogs = msgLogService.selectTimeoutMsg();  //查询还在投递中的消息msgLogs.forEach(msgLog->{String msgId = msgLog.getMsgId();//超过投递次数则不会重新投递中的消息是否需要投递if(msgLog.getTryCount()>=MAX_TRY_COUNT){//不需要重新投递msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);log.info("消息ID {}超过最大的投递次数 {} 次,投递失败,需要人工查看!",msgId,MAX_TRY_COUNT);}else {//拿到消息在表中的本次重试时间,去获取下一次重试时间  同时 投递次数+1msgLogService.updateTryCount(msgId,msgLog.getNextTryTime());CorrelationData correlationData = new CorrelationData(msgId);//携带业务信息,作为业务的唯一标识//重新发送消息到MQ,让MQ去重新尝试消费这一条之前没有发送到MQ的消息(因为我们现在查的消息的状态是status =0 的代表是消息还是投递中的,没有变成投递成功的消息,肯定是投递有问题)rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME,  //每一条消息都和exchange routingKey绑定, 所有消息重投共用这一个定时任务即可MessageHelper.objToMsg(msgLog),correlationData);log.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投递消息");}});log.info("定时任务执行结束(重新投递消息)");  //}}

四、基本测试

OK, 目前为止, 代码准备就绪, 现在进行正常流程的测试 1.发送请求:
在这里插入图片描述
后台日志:
在这里插入图片描述
3.库消息记录:
在这里插入图片描述
状态为3, 表明已消费, 消息重试次数为0, 表明一次投递就成功了,此时就可以到目标邮箱中去查看是否接收到了这个邮件

五、异常情况测试

1.验证消息发送到Exchange失败情况下的回调, 对应上图P -> X

如何验证? 可以随便指定一个不存在的交换机名称, 请求接口, 看是否会触发回调
在这里插入图片描述
发送失败, 原因: reply-code=404, reply-text=NOT_FOUND - no exchange ‘mail.exchangeabcd’ in vhost ‘/’, 该回调能够保证消息正确发送到Exchange, 测试完成

2.验证消息从Exchange路由到Queue失败情况下的回调, 对应上图X -> Q 同理, 修改一下路由键为不存在的即可, 路由失败, 触发回调
在这里插入图片描述
发送失败, 原因: route: mail.routing.keyabcd, replyCode: 312, replyText: NO_ROUTE

3.验证在手动ack模式下, 消费端必须进行手动确认(ack), 否则消息会一直保存在队列中, 直到被消费, 对应上图Q -> C 将消费端代码channel.basicAck(tag, false);// 消费确认注释掉, 查看控制台和rabbitmq管控台
在这里插入图片描述
在这里插入图片描述
可以看到, 虽然消息确实被消费了, 但是由于是手动确认模式, 而最后又没手动确认, 所以, 消息仍被rabbitmq保存, 所以, 手动ack能够保证消息一定被消费, 但一定要记得basicAck

4.验证消费端幂等性 接着上一步, 去掉注释, 重启服务器, 由于有一条未被ack的消息, 所以重启后监听到消息, 进行消费, 但是由于消费前会判断该消息的状态是否未被消费, 发现status=3, 即已消费, 所以, 直接return, 这样就保证了消费端的幂等性, 即使由于网络等原因投递成功而未触发回调, 从而多次投递, 也不会重复消费进而发生业务异常
在这里插入图片描述

5.验证消费端发生异常消息也不会丢失 很显然, 消费端代码可能发生异常, 如果不做处理, 业务没正确执行, 消息却不见了, 给我们感觉就是消息丢失了, 由于我们消费端代码做了异常捕获, 业务异常时, 会触发: channel.basicNack(tag, false, true);, 这样会告诉rabbitmq该消息消费失败, 需要重新入队, 可以重新投递到其他正常的消费端进行消费, 从而保证消息不被丢失 测试: send方法直接返回false即可(这里跟抛出异常一个意思),因为我们向MQ插入了消息,但是实际业务消费了,但是发送邮件返回了false,这样会从新投递到MQ队列中,再进行消费,一直重复。
代码修改:
在这里插入图片描述
结果:
在这里插入图片描述

可以看到, 由于channel.basicNack(tag, false, true), 未被ack的消息(unacked)会重新入队并被消费, 这样就保证了消息不会走丢

6.验证定时任务的消息重投 实际应用场景中, 可能由于网络原因, 或者消息未被持久化MQ就宕机了, 使得投递确认的回调方法ConfirmCallback没有被执行, 从而导致数据库该消息状态一直是投递中的状态, 此时就需要进行消息重投, 即使也许消息已经被消费了 定时任务只是保证消息100%投递成功, 而多次投递的消费幂等性需要消费端自己保证 我们可以将回调和消费成功后更新消息状态的代码注释掉, 开启定时任务, 查看是否重投

这是没有异常信息的情况下,定时任务每次都不会做实际的业务:
在这里插入图片描述
当我们对一条消息,进行了实际的业务处理,而且也业务处理成功了,只是没有把状态修改成成功,这样定时任务会扫,重新入队列,但是有幂等性校验,所以一直发送到队列将这条信息,直到3次后,消息会被更新为发送失败
在这里插入图片描述

在这里插入图片描述

发送邮件其实很简单, 但深究起来其实有很多需要注意和完善的点, 一个看似很小的知识点, 也可以引申出很多问题, 甚至涉及到方方面面, 这些都需要自己踩坑, 当然我这代码肯定还有很多不完善和需要优化的点, 希望小伙伴多多提意见和建议 我的代码都是经过自测验证过的, 图也都是一点一点自己画的或认真截的, 希望小伙伴能学到一点东西, 路过的点个赞或点个关注呗, 谢谢

部分参考:springboot + rabbitmq发送邮件实战(保证消息100%投递成功并被消费)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/39236.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高考失利咨询复读,银河补习班客服开挂回复

补习班的客服在高考成绩出来后&#xff0c;需要用专业的知识和足够的耐心来回复各种咨询&#xff0c;聊天宝快捷回复软件&#xff0c;帮助客服开挂回复。 ​ 前言 高考成绩出来&#xff0c;几家欢喜几家愁&#xff0c;对于高考失利的学生和家长&#xff0c;找一个靠谱的复读补…

全面了解机器学习

目录 一、基本认识 1. 介绍 2. 机器学习位置 二、机器学习的类型 1. 监督学习 2. 无监督学习 3. 强化学习 三、机器学习术语 1. 训练样本 2. 训练 3. 特征 4. 目标 5. 损失函数 四、机器学习流程 五、机器学习算法 1. 分类算法 2. 聚类算法 3. 关联分析 4. …

Qt入门教程(一):Qt使用的基本知识

目录 Qt简介 新建项目 构建目录和工作目录 构建目录 工作目录 项目结构 项目配置文件 .pro 用户文件 .user 主文件 main.cpp 头文件 dialog.h 源文件 dialog.cpp 帮助文档 三种查询文档的方式&#xff1a; 文档的重点位置&#xff1a;​编辑 调试信息 Qt简介 Qt…

java 代码块

Java中的代码块主要有三种类型&#xff1a;普通代码块、静态代码块、构造代码块。它们的用途和执行时机各不相同。 普通代码块&#xff1a;在方法内部定义&#xff0c;使用一对大括号{}包围的代码片段。它的作用域限定在大括号内&#xff0c;每当程序执行到该代码块时就会执行其…

全平台7合一自定义小程序源码系统功能强大 前后端分离 带完整的安装代码包以及搭建教程

系统概述 这款全平台 7 合一自定义小程序源码系统是专为满足各种业务需求而设计的。它整合了多种功能&#xff0c;能够在不同平台上运行&#xff0c;为用户提供了全方位的体验。无论你是企业主、开发者还是创业者&#xff0c;这款系统都能为你提供强大的支持。 代码示例 系统…

crewAI实践(包含memory的启用)--AiRusumeGenerator

crewAI实践--AiRusumeGenerator 什么是crewAIAiRusumeGenerator功能效果展示开发背景开发步骤1. 首先得学习下这款框架原理大概用法能够用来做什么&#xff1f; 2. 安装crewAI以及使用概述3. 写代码Agents.pyTasks.pymian.py关于task中引入的自定义工具这里不再赘述 什么是crew…

V Rising夜族崛起的管理员指令大全

使用方法&#xff1a; 如果没有启用控制台需要先启用控制台 打开游戏点击选项&#xff08;如果在游戏内点击ESC即可&#xff09;&#xff0c;在通用页面找到启用控制台&#xff0c;勾选右边的方框启用 在游戏内点击键盘ESC下方的波浪键&#xff08;~&#xff09;使用控制台 指…

构建LangChain应用程序的示例代码:49、如何使用 OpenAI 的 GPT-4 和 LangChain 库实现多模态问答系统

! pip install "openai>1" "langchain>0.0.331rc2" matplotlib pillow加载图像 我们将图像编码为 base64 字符串&#xff0c;如 OpenAI GPT-4V 文档中所述。 import base64 import io import osimport numpy as np from IPython.display import HT…

PDF一键转PPT文件!这2个AI工具值得推荐,办公必备!

PDF转换为PPT文件&#xff0c;是职场上非常常见的需求&#xff0c;过去想要把PDF文件转换为PPT&#xff0c;得借助各种文件转换工具&#xff0c;但在如今AI技术主导的大背景下&#xff0c;我们在选用工具时有了更多的选择&#xff0c;最明显的就是基于AI技术打造的AI格式转换工…

《昇思25天学习打卡营第21天 | 昇思MindSporePix2Pix实现图像转换》

21天 本节学习了通过Pix2Pix实现图像转换。 Pix2Pix是基于条件生成对抗网络&#xff08;cGAN&#xff09;实现的一种深度学习图像转换模型。可以实现语义/标签到真实图片、灰度图到彩色图、航空图到地图、白天到黑夜、线稿图到实物图的转换。Pix2Pix是将cGAN应用于有监督的图…

gin框架 gin.Context中的Abort方法使用注意事项 - gin框架中立刻中断当前请求的方法

gin框架上下文中的Abort序列方法&#xff08;Abort&#xff0c;AbortWithStatus&#xff0c; AbortWithStatusJSON&#xff0c;AbortWithError&#xff09;他们都不会立刻终止当前的请求&#xff0c;在中间件中调用Abort方法后中间件中的后续的代码会被继续执行&#xff0c;但是…

【Unity 人性动画的复用性】

Unity的动画系统&#xff0c;通常称为Mecanim&#xff0c;提供了强大的动画复用功能&#xff0c;特别是针对人型动画的重定向技术。这种技术允许开发者将一组动画应用到不同的角色模型上&#xff0c;而不需要为每个模型单独制作动画。这通过在模型的骨骼结构之间建立对应关系来…

系统安全与应用

目录 1. 系统账户清理 2. 密码安全性控制 2.1 密码复杂性 2.2 密码时限 3 命令历史查看限制 4. 终端自动注销 5. su权限以及sudo提权 5.1 su权限 5.2 sudo提权 6. 限制更改GRUB引导 7. 网络端口扫描 那天不知道为什么&#xff0c;心血来潮看了一下passwd配置文件&am…

三维家:SaaS的IT规模化降本之道|OceanBase 《DB大咖说》(十一)

OceanBase《DB大咖说》第 11 期&#xff0c;我们邀请到了三维家的技术总监庄建超&#xff0c;来分享他对数据库技术的理解&#xff0c;以及典型 SaaS 场景在数据库如何实现规模化降本的经验与体会。 庄建超&#xff0c;身为三维家的技术总监&#xff0c;独挑大梁&#xff0c;负…

grpc学习golang版( 八、双向流示例 )

系列文章目录 第一章 grpc基本概念与安装 第二章 grpc入门示例 第三章 proto文件数据类型 第四章 多服务示例 第五章 多proto文件示例 第六章 服务器流式传输 第七章 客户端流式传输 第八章 双向流示例 文章目录 一、前言二、定义proto文件三、编写server服务端四、编写client客…

中霖教育:环评工程师好考吗?

【中霖教育好吗】【中霖教育怎么样】 在专业领域&#xff0c;环评工程师资格认证考试是一项具有挑战性的考试&#xff0c;考试科目为&#xff1a;《环境影响评价相关法律法规》 《环境影响评价技术导则与标准》《环境影响评价案例分析》《环境影响评价技术方法》。 四个科目…

【Linux】—VMware安装Centos7步骤

文章目录 前言一、虚拟机准备二、CentOS7操作系统安装 前言 本文介绍VMware安装Centos7步骤。 软件准备 软件&#xff1a;VMware Workstation Pro&#xff0c;直接官网安装。镜像&#xff1a;CentOS7&#xff0c;镜像官网下载链接&#xff1a;https://vault.centos.org/&#x…

[C++]——同步异步日志系统(1)

同步异步日志系统 一、项⽬介绍二、开发环境三、核心技术四、环境搭建五、日志系统介绍5.1 为什么需要日志系统5.2 日志系统技术实现5.2.1 同步写日志5.2.2 异步写日志 日志系统&#xff1a; 日志&#xff1a;程序在运行过程中&#xff0c;用来记录程序运行状态信息。 作用&…

【面试系列】机器学习工程师高频面试题及详细解答

欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;欢迎订阅相关专栏&#xff1a; ⭐️ 全网最全IT互联网公司面试宝典&#xff1a;收集整理全网各大IT互联网公司技术、项目、HR面试真题. ⭐️ AIGC时代的创新与未来&#xff1a;详细讲解AIGC的概念、核心技术、…

JSONpath语法怎么用?

JSONPath 可以看作定位目标对象位置的语言&#xff0c;适用于 JSON 文档。 JSONPath 与 JSON 的 关系相当于 XPath 与 XML 的关系&#xff0c; JSONPath 参照 XPath 的路径表达式&#xff0c;提供了描述 JSON 文档层次结构的表达式&#xff0c;通过表达式对目标…