通用接口开放平台设计与实现——(30)消息服务端之消息重发优化

背景

消息服务通信机制为异步,且网络连接不是100%可靠,会因为网络闪断问题丢失消息,作为企业应用,需要保证业务消息传输的可靠性,需实现以下机制:
a)发送方重发机制:消息发送方对未收到响应的消息进行重发,重发时保证消息唯一性标识、消息内容不变
b)接收方消息去重机制:消息接收方依据消息的唯一性标识,对收到的消息进行验证,判断是否已处理过,如已处理过则不再进行处理

前面我们依托消息日志,实现了消息服务端消息重发的设计与实现,即通过消息日志表,来缓存待发送或发送失败的消息,然后通过定时器,来执行一段逻辑,从消息日志重建消息,找到要接收消息的客户端连接,然后推送消息。

问题

该方案虽然能实现消息重发,但存在以下几个问题:
1.依托消息日志来实现重发功能,消息日志的职责不再单一
2.消息日志数量大的情况下,查询待发送消息耗时长,性能低
3.消息日志清理时需注意保留待发送的消息,或已经发生尚未收到响应的消息

其本质问题还是在于消息日志的职责不单一带来,肩负着额外的消息重发功能。

解决方案

重构优化,新增活跃消息实体,承接待发送或已发送尚未收到响应的消息,当消息已发送且收到响应后,再将其转移到消息日志中。

同时,考虑到对接的相关方系统可能会因为系统异常如宕机,导致消息服务中心的消息推送次数达到预设次数上限,停止自动推送。

当相关方系统恢复正常时,需要消息服务中心重新推送发送失败的消息,因此新增手工重发功能,在活跃消息列表页面,可根据条件组合查询消息,勾选记录后执行重发操作。

系统实现

实体配置

使用平台的实体配置功能,拷贝现有的消息日志实体MessageLog,更名为ActiveMessage。
执行生成库表、生成代码、拷贝代码、编译运行、配置菜单、设置权限等基础操作。
image.png

消息收发相关调整

消息服务端,收到系统类请求消息,如登录请求,这类请求不需要消息转发,因此继续使用原消息日志服务保存请求和响应。

收到业务请求消息,如委托单创建,需要回复一条消息确认,继续使用原消息日志服务保存请求和响应;
同时判断是否需要消息转发,如需要,则使用新建的活跃消息来处理发送请求。

 /*** 发送消息** @param appCode 应用标识* @param content 消息内容* @param id      消息标识*/public void sendMessage(String appCode, String content, String id) {// 生成请求消息RequestMessage message = new RequestMessage();// 使用已有ID重置默认生成的ID,用于关联消息if (StringUtils.isNotBlank(id)) {message.setId(id);}// 设置相关属性message.setTopic(super.getTopic());// 参数中消息内容优先,如为空,取对象属性的值if (StringUtils.isNotBlank(content)) {message.setContent(content);} else {message.setContent(this.getContent());}message.setPublishAppCode(appConfig.getMessage().getMessageServerAppCode());App app = appService.getByCode(appCode);if (app.getIntegrationModel().equals(IntegrationModelEnum.CLIENT.name())) {// 客户端对接模式// 获取发送通道Channel channel = MessageServerHolder.getChannel(appCode);if (channel != null && channel.isActive()) {ChannelFuture channelFuture = channel.writeAndFlush(message);channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (StringUtils.isBlank(id)) {// 创建活跃消息ActiveMessage activeMessage = activeMessageService.createRequestPart(message, appCode);// 设置状态为已请求activeMessageService.updateStatus(MessageStatusEnum.REQUESTED.name(), activeMessage.getRequestId());// 发送次数加1activeMessageService.incrementSendCount(activeMessage.getRequestId());} else {// 更新发送次数activeMessageService.incrementSendCount(id);}}});} else {// 创建日志activeMessageService.createRequestPart(message, appCode);}} else {// api接口对接模式if (StringUtils.isBlank(id)) {// 创建活跃消息ActiveMessage activeMessage = activeMessageService.createRequestPart(message, appCode);// 设置状态为待处理activeMessageService.updateStatus(ApiMessageStatusEnum.WAIT_HANDLE.name(), activeMessage.getRequestId());}}}

收到响应消息,如消息确认,使用新建的活跃消息来处理响应,查找相应的发送记录,补全信息,然后从活跃消息移动到消息日志中。

  /*** 消息处理** @param message 消息* @param channel 通道*/@Transactional(rollbackFor = Exception.class)public void handleMessage(ResponseMessage responseMessage, Channel channel) {// 验证框架String appCode = MessageServerHolder.getAppCode(channel);validateFramework(responseMessage, appCode);// 更新活跃消息activeMessageService.updateResponsePart(responseMessage);// 拷贝至消息日志ActiveMessage activeMessage = activeMessageService.getByRequestMessageId(responseMessage.getRequestMessageId());MessageLog messageLog = new MessageLog();BeanUtils.copyProperties(activeMessage, messageLog);messageLogService.add(messageLog);// 删除活跃消息activeMessageService.remove(activeMessage.getId());// 特殊处理messageOperation(responseMessage, channel);}

消息重发相关调整

消息重发原由消息日志服务来负责,现变更为由活跃消息服务来处理。

 public void resend() {// 需要进行异常处理,否则某次异常会导致定时器停止运行try {// 先获取需要重发的应用列表List<String> resendAppList = activeMessageService.getResendAppList(appConfig.getMessage().getMaxSendCount());if (CollectionUtils.isNotEmpty(resendAppList)) {log.info("读取到需要重发的应用数量:{}", resendAppList.size());// 遍历应用列表,获取要重发的消息for (String appCode : resendAppList) {// 查找待重发的消息List<ActiveMessage> list =activeMessageService.getResendMessage(appConfig.getMessage().getSendMessageCount(),appConfig.getMessage().getMaxSendCount(), appCode);log.info("读取到需要重发至应用{}的消息数量:{}", appCode, list.size());for (int i = 0; i < list.size(); i++) {ActiveMessage activeMessage = list.get(i);// 根据消息主题构建发送器RequestMessageSender sender = (RequestMessageSender) MessageSenderFactory.createSender(activeMessage.getRequestTopicCode());// 传入原请求的消息标识和消息内容sender.sendMessage(activeMessage.getResponseAppCode(), activeMessage.getRequestData(), activeMessage.getRequestId());}}}} catch (Exception e) {log.error("消息重发处理异常", e);}

消息查询和消息确认API调整

由活跃消息服务替换掉原消息日志服务,消息确认时,补全信息,从活跃消息移动到消息日志。

/*** 消息查询处理器** @author wqliu* @date 2021-8-20**/
@Slf4j
public class MessageQueryHandler extends BaseServiceHandler<MessageQueryParameter> {@Overrideprotected String handleBusiness(MessageQueryParameter parameter, String appCode) {ActiveMessageService service = SpringUtil.getBean(ActiveMessageService.class);List<ActiveMessage> list = service.queryWaitHandleMessages(parameter.getCount(), appCode);String data = JSON.toJSONString(list);log.info("查询到的待处理消息为:{}", data);return data;}
}
    @Overridepublic void confirm(String requestMessageId, String appCode) {// 获取消息日志对象ActiveMessage activeMessage = getByRequestMessageId(requestMessageId);// 判断是否有权限对本消息确认if (!appCode.equals(activeMessage.getResponseAppCode())) {throw new CustomException(ActiveMessageExceptionEnum.MESSAGE_CONFIRM_PERMISSION_ERROR);}// 更新消息日志activeMessage.setStatus(ApiMessageStatusEnum.HANDLED.name());activeMessage.setResponseResult(MessageResponseResultEnum.SUCCESS.name());activeMessage.setResponseTime(LocalDateTime.now());// 更新日志modify(activeMessage);// 拷贝至消息日志MessageLog messageLog = new MessageLog();BeanUtils.copyProperties(activeMessage, messageLog);messageLogService.add(messageLog);// 删除活跃消息remove(activeMessage.getId());}

开源平台资料

平台名称:一二三开发平台
简介: 企业级通用低代码开发平台
设计资料:CSDN专栏
开源地址:Gitee
开源协议:MIT
欢迎收藏、点赞、评论,你的支持是我前行的动力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/799618.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

酱油行业市场需求及广阔前景分析

随着居民消费升级&#xff0c;对高品质生活的向往使得酱油市场需求持续增长。消费者对酱油的功能性需求日益细分&#xff0c;追求健康、天然与个性化的产品特性&#xff0c;从而推动了市场的多元化进步。 同时&#xff0c;餐饮业的蓬勃发展以及外卖市场的扩大&#xff0c;均为酱…

【Django开发】前后端分离美多商城项目第6篇:用户部分,1. 业务说明【附代码文档】

美多商城项目4.0文档完整教程&#xff08;附代码资料&#xff09;主要内容讲述&#xff1a;美多商城&#xff0c;项目准备1.B2B--企业对企业,2.C2C--个人对个人,3.B2C--企业对个人,4.C2B--个人对企业,5.O2O--线上到线下,6.F2C--工厂到个人。项目准备&#xff0c;配置1. 修改set…

通用分布式锁组件

通用分布式锁组件 1 Redisson1.1介绍1.2 为什么要使用Redisson实现分布式锁1.2.1 锁续期的问题1.2.2 获取锁尝试的问题1.2.3 可重入问题 1.3 Wath Dog的自动延期机制1.4 快速了解1.5 项目集成 2 定义通用分布式锁组件2.1 实现思路分析2.2 定义注解2.3 定义切面2.4 使用锁2.5.工…

STL之string模拟实现

面试题&#xff1a;简易版string(深拷贝与浅拷贝的问题) 如果要实现简易版的string 无需涉及增容问题&#xff0c;成员变量可以不用存储容量和元素个数 构造函数 错误示范 class string {string(): _str(nullptr){}string(const char* str): _str(str){}char& operator[](s…

Redis数据库:高可用(主从复制、哨兵模式、cluster集群)

目录 前言 一、Redis数据库高可用 二、Redis 主从复制 1、Redis主从复制概述 1.1 Redis主从复制概念 1.2 Redis主从复制的作用 1.3 Redis主从复制的流程 2、搭建Redis主从复制 2.1 环境部署 2.2 主服务器修改配置文件 2.3 从服务器修改配置文件 2.4 测试主从复制效…

负氧离子监测站解析

TH-FZ4防腐木负氧离子监测站&#xff0c;作为一种独特的空气质量监测设备&#xff0c;以其独特的优势在生态环保领域发挥着日益重要的作用。这种监测站不仅具备防腐木材质带来的天然美感与耐久性&#xff0c;更结合了先进的负氧离子监测技术&#xff0c;为环境保护和生态旅游等…

[开源]基于SVM的时间序列预测python代码

整理了SVM的时间序列预测python代码分享给大家。记得点赞哦 #!/usr/bin/env python # coding: utf-8import numpy as np import matplotlib.pyplot as plt import pandas as pd from sklearn import preprocessing from sklearn.metrics import mean_squared_error from math i…

短剧小程序系统开发,让短剧观看与创作更加便捷。短剧系统源码搭建

一、目前短剧发展趋势 1. 市场规模&#xff1a;根据数据来看&#xff0c;2023年中国微短剧市场规模达到了373.9亿元&#xff0c;同比上升了267.65%。预计2024年市场规模将超过500亿元。这一市场规模的增长速度非常显著&#xff0c;显示出短剧行业的巨大潜力和发展前景。 2. 投…

蓝桥杯考前复习三

1.约数个数 由乘法原理可以得出&#xff1a; import java.util.*; public class Main{static int mod (int)1e9 7;public static void main(String[] args){Map<Integer,Integer> map new HashMap<>(); //创建一个哈希表Scanner scan new Scanner(System.in);i…

【会议】Oracle自动化运维峰会

2023年7月21日&#xff0c;杭州。我组织了Oracle自动化运维峰会&#xff0c;大约有20人左右参加会议。以下是会议主题&#xff1a; Oracle自动化运维能力是Oracle 19c自动化运维体系中非常重要的一环&#xff0c;自动化索引、自动化SQL优化、资源隔离等技术能够非常好的提升运维…

Java基于微信小程序的校园外卖平台系统,附源码

博主介绍&#xff1a;✌IT徐师兄、7年大厂程序员经历。全网粉丝15W、csdn博客专家、掘金/华为云//InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3…

C++ vector顺序表模拟实现

目录 前言&#xff1a; 模拟实现&#xff1a; 构造函数&#xff1a; 析构函数&#xff1a; 容量调整&#xff08;reserve&#xff09;&#xff1a; resize函数&#xff1a; 尾插&#xff08;push_back&#xff09;: 尾删&#xff08;pop_back&#xff09;: 插入&#xff…

C++ | Leetcode C++题解之第8题字符串转换整数atoi

题目&#xff1a; 题解&#xff1a; class Automaton {string state "start";unordered_map<string, vector<string>> table {{"start", {"start", "signed", "in_number", "end"}},{"signed…

如何判断超级充电测试的性能

超级充电测试是电动汽车充电设备性能评估的重要环节&#xff0c;其性能的好坏直接影响到电动汽车的充电效率和使用寿命。以下是判断超级充电测试性能的几个关键因素&#xff1a;这是衡量超级充电测试性能的最直接指标&#xff0c;充电速度快意味着电动汽车可以在更短的时间内完…

商业分析思维与实践:用数据分析解决商业问题

&#x1f482; 个人网站:【 摸鱼游戏】【神级代码资源网站】【工具大全】&#x1f91f; 一站式轻松构建小程序、Web网站、移动应用&#xff1a;&#x1f449;注册地址&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交…

日期时间相关的类

分界线jdk8 jdk8之前和之后分别提供了一些日期和时间的类&#xff0c;推荐使用jdk8之后的日期和时间类 Date类型 这是一个jdk8之前的类型&#xff0c;其中有很多方法已经过时了&#xff0c;选取了一些没有过时的API //jdk1.8之前的日期 Date Date date new Date(); // 从1970年…

区块链的网络架构有哪些?

区块链技术的兴起正在深刻地改变着互联网的格局。它不仅提供了去中心化、数据透明、难以篡改等优势&#xff0c;还为各种应用场景提供了新的可能性。为了更好地理解区块链&#xff0c;我们需要深入探讨其网络架构。 区块链网络架构主要由以下几个部分组成&#xff1a; 1. 区块…

Web 前端性能优化之五:构建优化

4、构建优化 资源的合并与压缩所涉及的优化点包括两方面&#xff1a;一方面是减少HTTP的请求数量&#xff0c;另一方面是减少HTTP请求资源的大小。 1、HTML 压缩 1、什么是 HTML 压缩 百度首页部分 HTML 源代码 谷歌首页部分 HTML 源代码 虽然这些格式化的字符能带来很好的代…

SpringBoot及其特性

0.前言 Spring 框架提供了很多现成的功能。那么什么是 Spring Boot&#xff1f;使用 Spring 框架&#xff0c;我们可以避免编写基础框架并快速开发应用程序。为了让 Spring 框架提供基础框架&#xff0c;我们需要向 Spring 框架描述有关我们的应用程序及其组件的信息。 不只是…

OpenAI Sora:浅析文生视频模型Sora以及技术原理简介

一、Sora是什么&#xff1f; Sora官方链接&#xff1a;https://openai.com/sora 视频模型领头羊Runway Gen 2、Pika等AI视频工具&#xff0c;都还在突破几秒内的连贯性&#xff0c;而OpenAI&#xff0c;已经达到了史诗级的纪录。 OpenAI&#xff0c;永远快别人一步&#xff0…