支付模块-基于消息队列发送支付通知消息

消息队列发送支付通知消息

需求分析

订单服务作为通用服务,在订单支付成功后需要将支付结果异步通知给其他对接的微服务,微服务收到支付结果根据订单的类型去更新自己的业务数据

在这里插入图片描述

技术方案

使用消息队列进行异步通知需要保证消息的可靠性即生产端将消息成功通知到服务端: 消息发送到交换机 --> 由交换机发送到队列 --> 消费者监听队列,收到消息进行处理,参考文章02- 使用Docker安装RabbitMQ-CSDN博客

  • 生产者确认机制: 发送消息前使用数据库事务将消息保证到数据库表中,成功发送到交换机将消息从数据库中删除

  • 配置MQ持久化(交换机、队列、发送消息):MQ收到消息持久化,当MQ重启时即使消息没有消费完也不会丢失

  • 消费者确认机制: 消费者消费成功,自动发送ACK,负责重试消费

发布订阅模式: 订单服务接收支付成功结果通知后创建一条消息发送给Fanout广播类型的交换机,学习中心服务绑定队列到交换机接收消息,参考文章04- 基于SpringAMQP封装RabbitMQ,消息队列的Work模型和发布订阅模型-CSDN博客

环境搭建

第一步: 在订单服务和学习中心服务中添加消息队列依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:在Nacos的dev环境下添加RabbitMQ的配置信息rabbitmq-dev.yaml,设置group为xuecheng-plus-common

spring:rabbitmq:host: 192.168.101.128 # 主机port: 5672 # 端口名username: root # 用户名password: root # 密码virtual-host: / # 虚拟主机publisher-confirm-type: correlated # 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallbackpublisher-returns: true # 开启publish-return功能,同样是基于callback机制调用回调函数ReturnCallbacktemplate:mandatory: true # 定义消息路由失败时的策略,true表示调用ReturnCallback;false表示直接丢弃消息listener:simple:# 每次只能获取一条消息,处理完成才能获取下一个消息prefetch: 1  # auto:出现异常时返回unack且消息回滚到mq,如果没有异常直接返回ack# manual:手动控制# none:丢弃消息不回滚到mqacknowledge-mode: auto retry:enabled: false # 开启消费者失败重试initial-interval: 5000ms # 初始的失败等待时长为几秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态;如果业务中包含事务需要改为false

第三步:在订单服务和学习中心服务的接口工程中引入rabbitmq-dev.yaml配置文件

- data-id: rabbitmq-${spring.profiles.active}.yamlgroup: xuecheng-plus-commonrefresh: true

第四步: 在订单服务的service工程编写MQ配置类PayNotifyConfig创建交换机和队列

@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {// 交换机public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";// 支付结果通知消息类型public static final String MESSAGE_TYPE = "payresult_notify";// 支付通知队列public static final String PAYNOTIFY_QUEUE = "paynotify_queue";// 声明交换机且持久化@Bean(PAYNOTIFY_EXCHANGE_FANOUT)public FanoutExchange paynotify_exchange_fanout() {// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);}//支付通知队列且持久化@Bean(PAYNOTIFY_QUEUE)public Queue course_publish_queue() {return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();}// 交换机和支付通知队列绑定@Beanpublic Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}// 交换机路由消息到队列的时候如果失败执行回调函数@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 消息处理serviceMqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 消息发送失败记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 解析消息内容,将消息再添加到消息表MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class);mqMessageService.addMessage(mqMessage.getMessageType(), mqMessage.getBusinessKey1(), mqMessage.getBusinessKey2(), mqMessage.getBusinessKey3());});}
}

第五步: 在学习中心服务编写MQ配置类PayNotifyConfig创建交换机和队列,避免学习中心服务启动的时候监听的队列还没有创建,如果生产端已经创建就不再创建

@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {// 声明交换机,支付通知队列,交换机和支付通知队列绑定关系// 不用设置回调函数,只有生产者才需要确认 
}

重启订单服务,登录rabbitmq查看交换机自动创建成功

在这里插入图片描述

生产者发送信息

在订单服务的OrderService中定义接口接收支付宝响应的通知消息结果并发送给学习中心服务

public interface OrderService {/*** 接收通知结果并发送给学习中心服务* @param mq	Message 消息*/void notifyPayResult(MqMessage mqMessage);
}
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {@AutowiredMqMessageService mqMessageService;@AutowiredRabbitTemplate rabbitTemplate;@Overridepublic void notifyPayResult(MqMessage mqMessage) {// 1. 将消息体转为JsonString jsonMsg = JSON.toJSONString(mqMessage);// 2. 设置消息的持久化方式为PERSISTENT,即消息会被持久化到磁盘上,确保即使在RabbitMQ服务器重启后也能够恢复消息Message msgObj = MessageBuilder.withBody(jsonMsg.getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 3. 封装CorrelationData,用于跟踪指定Id消息的相关信息CorrelationData correlationData = new CorrelationData(mqMessage.getId().toString());// 3.1 使用CorrelationData添加一个Callback对象指定回调方法,该对象用于在消息确认时处理消息的结果correlationData.getFuture().addCallback(result -> {if (result.isAck()) {// 3.2 消息成功发送到交换机,删除消息表中的记录log.debug("消息发送成功:{}", jsonMsg);mqMessageService.completed(mqMessage.getId());} else {// 3.3 消息发送失败log.error("消息发送失败,id:{},原因:{}", mqMessage.getId(), result.getReason());}}, ex -> {// 3.4 消息异常可能是网络问题log.error("消息发送异常,id:{},原因:{}", mqMessage.getId(), ex.getMessage());});// 4. 发送消息rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, "", msgObj, correlationData);}
}

订单服务收到第三方平台的支付结果时,在saveAliPayStatus方法中除了保存支付宝响应的结果信息还需要向数据库消息表添加消息记录将消息封装好后发送给消费端

	/*** 保存支付结果信息,向数据库中的消息表添加消息并发送给消费端* @param payStatusDto 支付结果信息*/
@Transactional
@Override
public void saveAlipayStatus(PayStatusDto payStatusDto) {// 1. 获取支付流水号String payNo = payStatusDto.getOut_trade_no();// 2. 查询数据库订单状态XcPayRecord payRecord = getPayRecordByPayNo(payNo);if (payRecord == null) {XueChengPlusException.cast("未找到支付记录");}XcOrders order = xcOrdersMapper.selectById(payRecord.getOrderId());if (order == null) {XueChengPlusException.cast("找不到相关联的订单");}String statusFromDB = payRecord.getStatus();// 2.1 已支付,直接返回if ("600002".equals(statusFromDB)) {return;}// 3. 查询支付宝交易状态String tradeStatus = payStatusDto.getTrade_status();// 3.1 支付宝交易已成功,保存订单表和交易记录表,更新交易状态if ("TRADE_SUCCESS".equals(tradeStatus)) {// 更新支付交易表payRecord.setStatus("601002");payRecord.setOutPayNo(payStatusDto.getTrade_no());payRecord.setOutPayChannel("Alipay");payRecord.setPaySuccessTime(LocalDateTime.now());int updateRecord = xcPayRecordMapper.updateById(payRecord);if (updateRecord <= 0) {XueChengPlusException.cast("更新支付交易表失败");}// 更新订单表order.setStatus("600002");int updateOrder = xcOrdersMapper.updateById(order);if (updateOrder <= 0) {log.debug("更新订单表失败");XueChengPlusException.cast("更新订单表失败");}}// 4. 创建消息记录并保存到消息表中,参数1:支付结果类型通知;参数2:业务id;参数3:业务类型MqMessage mqMessage = mqMessageService.addMessage("payresult_notify", order.getOutBusinessId(), order.getOrderType(), null);// 5. 封装消息记录并发送给消费端notifyPayResult(mqMessage);
}

消费者接收消息

在学习中心服务定义impl/ReceivePayNotifyService

  • 监听消息队列接收支付结果, 当接收到消息后更新选课记录表的选课状态为选课成功,同时向我的课程表中插入一条课程记录
@Slf4j
@Service
public class ReceivePayNotifyService {@AutowiredMyCourseTablesService tablesService;@RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE)public void receive(Message message) {// 1. 获取消息MqMessage mqMessage = JSON.parseObject(message.getBody(), MqMessage.class);// 2. 根据消息内容,更新选课记录,向我的课程表插入记录// 2.1 消息类型,学习中心只处理支付结果的通知String messageType = mqMessage.getMessageType();// 2.2 选课idString chooseCourseId = mqMessage.getBusinessKey1();// 2.3 订单类型,60201表示购买课程String orderType = mqMessage.getBusinessKey2();// 3. 学习中心只负责处理支付结果的通知if (PayNotifyConfig.MESSAGE_TYPE.equals(messageType)){// 3.1 学习中心只负责购买课程类订单的结果if ("60201".equals(orderType)){// 3.2 保存选课记录boolean flag = tablesService.saveChooseCourseStatus(chooseCourseId);if (!flag){XueChengPlusException.cast("保存选课记录失败");}}}}
}

MyCourseTablesService接口中定义方法更新选课记录的选课状态,同时向我的课程表添加选课记录(之前添加免费课程的时候已经实现过了)

public interface MyCourseTablesService {/*** 保存选课成功状态* @param chooseCourseId* @return*/public boolean saveChooseCourseSuccess(String chooseCourseId);
}
@Slf4j
@Service
public class MyCourseTablesServiceImpl implements MyCourseTablesService {@Override@Transactionalpublic boolean saveChooseCourseStatus(String chooseCourseId) {// 1. 根据选课id,查询对应的选课记录XcChooseCourse chooseCourse = chooseCourseMapper.selectById(chooseCourseId);if (chooseCourse == null) {log.error("接收到购买课程的消息,根据选课id未查询到课程,选课id:{}", chooseCourseId);return false;}// 2. 选课状态为未支付时,更新选课状态为选课成功if ("701002".equals(chooseCourse.getStatus())) {chooseCourse.setStatus("701001");int update = chooseCourseMapper.updateById(chooseCourse);if (update <= 0) {log.error("更新选课记录失败:{}", chooseCourse);}}// 3. 向我的课程表添加记录addCourseTables(chooseCourse);return true;}
}
public XcCourseTables addCourseTabls(XcChooseCourse xcChooseCourse){//选课成功了才可以向我的课程表添加String status = xcChooseCourse.getStatus();if(!"701001".equals(status)){XueChengPlusException.cast("选课没有成功无法添加到课程表");}XcCourseTables xcCourseTables = getXcCourseTables(xcChooseCourse.getUserId(), xcChooseCourse.getCourseId());if(xcCourseTables!=null){return xcCourseTables;}xcCourseTables = new XcCourseTables();BeanUtils.copyProperties(xcChooseCourse,xcCourseTables);xcCourseTables.setChooseCourseId(xcChooseCourse.getId());//记录选课表的逐渐xcCourseTables.setCourseType(xcChooseCourse.getOrderType());//选课类型xcCourseTables.setUpdateDate(LocalDateTime.now());int insert = courseTablesMapper.insert(xcCourseTables);if(insert<=0){XueChengPlusException.cast("添加我的课程表失败");}return xcCourseTables;
}

通知支付结果测试

选择一门已发布的收费课程,如果在我的课程表存储则删除记录及其相关的选课记录及订单记录信息

  • 进入课程详细页面,点击马上学习生成二维码进行支付
  • 支付完成点击“支付完成”,观察订单服务控制台是否发送消息(使用内网穿透工具)
  • 观察学习中心服务控制台是否接收到消息
  • 观察数据库中的消息表的相应记录是否已删除,我的选课表中是否有对应的选课记录

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/742969.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入了解 大语言模型(LLM)微调方法

引言 众所周知&#xff0c;大语言模型(LLM)正在飞速发展&#xff0c;各行业都有了自己的大模型。其中&#xff0c;大模型微调技术在此过程中起到了非常关键的作用&#xff0c;它提升了模型的生成效率和适应性&#xff0c;使其能够在多样化的应用场景中发挥更大的价值。 那么&…

服务器命令

服务器命令 服务器命令top查看任务 服务器命令 top查看任务 、ps 命令 ps 命令是最基本同时也是非常强大的进程查看命令。使用该命令可以确定有哪些进程正在运行和它所运行的状态、进程是否结束、进程有没有僵死、哪些进程占用了过多的资源等。总之大部分信息都是可以通过执行…

pytorch模型转onnx格式,编写符号函数实现torch算子接口和onnx算子的映射,新建简单算子--模型部署记录整理

对于深度学习模型来说&#xff0c;模型部署指让训练好的模型在特定环境中运行的过程。相比于软件部署&#xff0c;模型部署会面临更多的难题&#xff1a; 运行模型所需的环境难以配置。深度学习模型通常是由一些框架编写&#xff0c;比如 PyTorch、TensorFlow。由于框架规模、依…

掌握Go语言:深入encoding/gob包的高效数据序列化

掌握Go语言&#xff1a;深入encoding/gob包的高效数据序列化 引言理解Gob和它的使用场景Gob的概念和设计目标Gob的适用场景和优势 开始使用Gob基本的Gob编码和解码示例代码编码&#xff08;序列化&#xff09;解码&#xff08;反序列化&#xff09; Gob编码高级应用自定义类型的…

【Java语言】遍历List元素时删除集合中的元素

目录 前言 实现方式 1.普通实现 1.1 使用【for循环】 方式 1.2 使用【迭代器】方式 2.jdk1.8新增功能实现 2.1 使用【lambda表达式】方式 2.2 使用【stream流】方式 注意事项 1. 使用【for循环】 方式 2. 不能使用增强for遍历修改元素 总结 前言 分享几种从List中移…

基于 K8s 容器集群的容灾架构与方案

作者&#xff1a;庄宇 在设计系统架构时&#xff0c;我们必须假设任何组件和任何基础设施可能会在任何时间失效&#xff0c;例如&#xff1a;自然灾害&#xff0c;电力中断&#xff0c;网络中断&#xff0c;错误的系统变更等。为了应对挑战&#xff0c;我们必须设计合适的容灾…

在centos8中部署Tomcat和Jenkins

参考链接&#xff1a;tomcat安装和部署jenkins_jenkins和tomcat-CSDN博客 1、进入centos中 /usr/local 目录文件下 [rootlocalhost webapps]# cd /usr/local2、使用通过wget命令下下载tomcat或者直接在官网下载centos版本的包后移动到centos中的local路径下 3、下载tomcat按…

VUE3内置组件Transition的学习使用

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 更多nbcio-boot功能请看演示系统RuoYi-Nbcio亿事达企业管理平台 gitee源代码地址 后端代码&#xff1a;…

详解Postman使用

简介&#xff1a; 1.简介 PostMan&#xff0c;一款接口调试工具。 特点&#xff1a; 可以保留接口请求的历史记录 可以使用测试集Collections有效管理组织接口 可以在团队之间同步接口数据 1.简介 PostMan&#xff0c;一款接口调试工具。 特点&#xff1a; 可以保留接口请求…

从0到1入门C++编程——12 演讲比赛流程管理系统

文章目录 一、创建类并显示菜单二、退出管理系统三、开始演讲比赛四、查看往届记录五、清空比赛记录六、案例源代码 演讲比赛流程管理系统 比赛规则&#xff1a;演讲比赛共有12个人参加&#xff0c;比赛分两轮进行&#xff0c;第一轮为淘汰赛&#xff0c;第二轮为决赛。每名选手…

HTML万字学习总结

html文本标签特殊符号图片音频与视频超链接表单列表表格语义标签(布局) html文本标签 标签简介根目录规定文档相关的配置信息&#xff08;元数据元素表示文档的内容表示那些不能由其它 HTML 元相关元素&#xff08;(<base>、<link>, <script>、<style>…

今日AI:GPT-4.5意外曝光可能6月发布、UP主借AI识别情绪播放量186万、全球首个AI程序员诞生

欢迎来到【今日AI】栏目!这里是你每天探索人工智能世界的指南&#xff0c;每天我们为你呈现AI领域的热点内容&#xff0c;聚焦开发者&#xff0c;助你洞悉技术趋势、了解创新AI产品应用。 新鲜AI产品点击了解:AIbase - 智能匹配最适合您的AI产品和网站 &#x1f4e2;一分钟速…

如何拆解技术瓶颈的难点

以大化小的思路 解决一个一个小问题从而解决最终问题 三段论&#xff1a; 抽象能力 职责领域划分 分层构建解决方案 案例&#xff1a;全局分布式事务的解决方案 抽象能力&#xff1a;全局分布式 是由一个个小的事务组合而成的&#xff0c;其中一个分布式事务出现问题&#xff…

探索考古文字场景,基于YOLOv8全系列【n/s/m/l/x】参数模型开发构建文本考古场景下的甲骨文字符图像检测识别系统

甲骨文是一种非常历史悠久的古老文字&#xff0c;在前面我们基本上很少有涉及这块的内容&#xff0c;最近正好在做文字相关的项目开发研究&#xff0c;就想着基于甲骨文的场景来开发对应的检测识别系统&#xff0c;在前文中我们基于YOLOv5、YOLOv7和YOLOv9开发构建了在仿真数据…

激活函数Mish

paper&#xff1a;Mish: A Self Regularized Non-Monotonic Activation Function official implementation&#xff1a;https://github.com/digantamisra98/Mish 背景 在早期文献中&#xff0c;Sigmoid和TanH激活函数被广泛使用&#xff0c;随后在深度神经网络中失效。相比于…

Springboot的配置文件及其优先级

配置文件 内置配置文件 配置文件的作用&#xff1a;修改SpringBoot自动配置的默认值&#xff1b;SpringBoot在底层都给我们自动配置好&#xff1b;SpringBoot使用一个全局的配置文件&#xff0c;配置文件名是固定的&#xff1a; application.propertiesapplication.yml 以上…

网络建设与运维培训介绍和能力介绍

1.开过的发票 3.培训获奖的证书 4合同签署 5.实训设备

[ThinkPHP]Arr返回1

$detailId (int)Arr::get($detail, null); var_dump($detailId); 打印结果&#xff1a;int(1) 原因&#xff1a; vendor/topthink/think-helper/src/helper/Arr.php

干洗店管理系统洗鞋店预约上门小程序洗护流程;

干洗店洗鞋店收银管理系统&#xfe63;智能线上预约洗衣店小程序软件; 闪站侠洗衣洗鞋店收银管理系统&#xff0c;一款集进销存、收衣、收银、会员管理等实用功能于一体的洗护管理软件&#xff0c;适用于各大中小型企业个体工商户&#xff0c;功能强大&#xff0c;操作简单&…

瑞_23种设计模式_命令模式

文章目录 1 命令模式&#xff08;Command Pattern&#xff09;1.1 介绍1.2 概述1.3 命令模式的结构1.4 命令模式的优缺点1.5 命令模式的使用场景 2 案例一2.1 需求2.2 代码实现 3 案例二3.1 需求3.2 代码实现 4 JDK源码解析&#xff08;Runable&#xff09; &#x1f64a; 前言…