IM聊天系统为什么需要做消息幂等?如何使用Redis以及Lua脚本做消息幂等【第12期】

0前言

消息收发模型
在这里插入图片描述

在这里插入图片描述
第一张图是一个时序图,第二张图是一个标清楚步骤的流程图,更加清晰。消息的插入环节主要在2步。save部分。主要也是对这个部分就行消息幂等的操作。

前情提要:使用Redis发布 token 以及lua脚本来共同完成消息的幂等

目前已经写的文章有。并且有对应视频版本。
git项目地址 【IM即时通信系统(企聊聊)】点击可跳转
sprinboot单体项目升级成springcloud项目 【第一期】
前端项目技术选型以及页面展示【第二期】
分布式权限 shiro + jwt + redis【第三期】
给为服务添加运维模块 统一管理【第四期】
微服务数据库模块【第五期】
netty与mq在项目中的使用(第六期)】
分布式websocket即时通信(IM)系统构建指南【第七期】
分布式websocket即时通信(IM)系统保证消息可靠性【第八期】
分布式websocket IM聊天系统相关问题问答【第九期】
什么?websocket也有权限!这个应该怎么做?【第十期】
分布式ID是什么,以美团Leaf为例改造融入自己项目【第十一期】

1.我开源项目IM重复的原因

  • IM系统中有三个常见的指标。消息可靠(不丢消息)。就是消息不能重复(不重复)。保证消息的时序性(不乱序)。这三个指标非常重要。
  • 消息可靠主要通过报文协议等操作来完成。前面视频有一期讲过报文协议。目前主要采取上述方式去保证消息的可靠性。然后再保证消息可靠性的过程中,有一些需要重试的操作。可能会导致数据库多次插入。需要我们来保证一下消息的幂等。通俗的讲就是保证消息的不重复。
1.客户端会重复的发送消息

客户端是一个timer的机制。客户端a发送给b消息的时候,在0.5秒没有收到b的ack的时候会重发消息,重发三次还没有收到ack视为重发失败
//使用timer机制 检测队列里面是否存在ack,如果存在,则超时重发以及限制次数

伪代码如下。用户在线并且不是重试消息的时候,添加到队列里面。

 if (res.params.online == true && res.params.isretry == "false") {state.queue.offer(state.tempSendMsg);////使用timer机制 检测队列里面是否存在ack,如果存在,则超时重发以及限制次数const result = await retry(fetchDataFn, 3, 1000, res.params.msgid);//三次之后消息还没有发送成功 提示消息发送失败if (result == false) {Toast("消息发送失败,请重新发送");}} else {console.log("【IM日志】 接受消息者没有登录或者是重试消息 ");}

进行重试的js代码

//重试的一个方法
export function retry(fn, maxRetry, timeout,msg) {return new Promise(async (resolve, reject) => {let retryCount = 0;let timer;const run = async () => {try {const result = await fn(msg);resolve(result);} catch (err) {if (retryCount < maxRetry) {retryCount++;clearTimeout(timer);timer = setTimeout(run, timeout);} else {reject(err);}}};timer = setTimeout(run, timeout);});
}
2.mq出现超时等的重试机制

参考上述逻辑图,消息落库的时候异步分发到了mq上面。rocketmq有超时重试机制,会自动重试。导致消息被多次消费。(明天补充个图片例子)

2.如何解决的幂等

为什么要解决幂等,什么情况下出现幂等(明天写);
使用redis做的幂等。redis做幂等其实有两种思路。

一种思路是我目前正在使用的防重 Token 令牌思路。另一种是下游传递唯一请求编号。主要说明防重token令牌的思路。其实差别就是一个redis里面的键被删除了。另一个没有删除。
防重token令牌
在这里插入图片描述
下游传递唯一请求编号如下
在这里插入图片描述

当客户端请求分布式id的时候将其存入redis。也就是获取一个唯一id。当进行消费消息的时候。先判断唯一id在不在。在的话删除redis中的唯一id并且进行业务操作。不再的话就不能进行业务操作来实现的幂等。
流程代码如下所示:

1.获取token以及存储token到redis中;
在loginUser 用户中心服务中

    @RequestMapping(value = "/api/segment/get/{key}")public GenericResponse getSegmentId(@PathVariable("key") String key) {String leafno = get(key, segmentService.getId(key));SetOperations<String, String> opsForSet = stringRedisTemplate.opsForSet();Long add = opsForSet.add(RedisPrefix.LEAF_PERFIX, leafno);//往集合添加元素/*** 设置一个10分钟的有效期*/
//        stringRedisTemplate.expire(RedisPrefix.LEAF_PERFIX,600, TimeUnit.SECONDS);return GenericResponse.response(ServiceError.NORMAL,leafno );}

我们使用了美团的分布式id来生成分布式id。
2.前台发送消息的时候携带上唯一id

const sendMsg2 = async () => {const { content, toUser } = state;const no = await getLeaf();let data = {// 1代表着私聊的意思type: 1,params: {msgid: no.content,toMessageId: toUser.openid,message: content,fileType: 0,isretry: false,},};if (state.current == 2) {data = {type: 9,params: {toMessageId: state.groupId,message: content,fileType: 0,},};}console.log(data);state.tempSendMsg = data;state.socketServe.send(data);state.recesiveAllMsg.push({type: "self",content: content,});state.content = "";};

这个是发送消息的操作
const no = await getLeaf();这行代码请求后端接口。然后构造消息体。
3.聊天服务(Netty)收到前台消息后 mq异步发送消息

 public void sendMessage(String topic ,ChannelHandlerContext ctx, String message, String toUser, String state, Boolean type, String msgid,String token) {MqMessage messageMQ = new MqMessage();messageMQ.setFromId(SessionUtils.getUser(ctx.channel()).getOpenid());messageMQ.setToId(toUser);messageMQ.setType(state);messageMQ.setInfoContent(message);messageMQ.setTime(new DateTime().toString());messageMQ.setState(type);messageMQ.setMsgid(msgid);messageMQ.setToken(token);messageDispatchService.sendForSave(topic,messageMQ);}

发送给保存的主题
4.业务模块(frist)消费消息

 @Overridepublic void onMessage(String o) {String mqmsg =o;log.info("RocketMqConsumerService=====消费消息:"+mqmsg);//消息内容MqMessage message1 = JSON.parseObject(mqmsg, MqMessage.class);try {ChatDto chatDto = new ChatDto();chatDto.setContent(message1.getInfoContent());chatDto.setToOpenid(message1.getToId());chatDto.setGroup(message1.getState());//将msgid存储进去,方便后续进行updatechatDto.setMsgId(message1.getMsgid());SetOperations<String, String> opsForSet = stringRedisTemplate.opsForSet();
//            Boolean member = opsForSet.isMember(RedisPrefix.LEAF_PERFIX, message1.getMsgid());if( executeOperation(message1.getMsgid())){
//                Long remove = opsForSet.remove(RedisPrefix.LEAF_PERFIX, message1.getMsgid());//删除元if (message1.getState() !=null){if(message1.getType().equals("onLine")){/*** 用户在线需要去推送一下*/yanUserChatService.saveChat(message1.getFromId(),chatDto,1);SendRequest send = buildSendRequest(message1);//设置过滤应该有的tokenRoseFeignConfig.token.set(message1.getToken());nettyMqFeign.send(send);}else {/*** 离线消息直接落库就链路就结束了*/yanUserChatService.saveChat(message1.getFromId(),chatDto,0);}}}}catch (Exception e){//失败的话需要把redis的这个消息还回去.SetOperations<String, String> opsForSet = stringRedisTemplate.opsForSet();Long add = opsForSet.add(RedisPrefix.LEAF_PERFIX,  message1.getMsgid());//往集合添加元素log.error("consumeMsg 消费mq消息失败.",e);// 处理失败,抛出异常,消息会根据重试策略稍后重新消费throw new RuntimeException("处理消息时发生错误,消息将被重新消费。");}}

lua表达式
目前使用redis的类型是set,键是yan_leaf

    /*** 幂等的方法,判断list存不存在。存在的话直接删除,下次进来就不存在了。* @param token* @return*/public boolean executeOperation(String token) {// Lua脚本String script = "if redis.call('sismember', KEYS[1], ARGV[1]) == 1 then return redis.call('srem', KEYS[1], ARGV[1]) else return 0 end";DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);// 执行Lua脚本Long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(RedisPrefix.LEAF_PERFIX), token);// 根据Lua脚本执行结果判断操作是否执行return result != null && result > 0;}

通过这个lua防止并发请求进来导致幂等失败

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/680512.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

119.乐理基础-五线谱-五线谱的标记

内容参考于&#xff1a;三分钟音乐社 上一个内容&#xff1a;音值组合法&#xff08;二&#xff09; 力度记号&#xff1a;简谱里什么意思&#xff0c;五线谱也完全是什么意思&#xff0c;p越多就越弱&#xff0c;f越多就越强&#xff0c;然后这些渐强、渐弱、sf、fp这些标记…

Proteus -模拟串口被关闭后怎样打开

Proteus -模拟串口被关闭后怎样打开 点击恢复弹出窗口&#xff0c;即可重新打开

WPF中值转换器的使用

什么是值转换器 在WPF&#xff08;Windows Presentation Foundation&#xff09;中&#xff0c;值转换器&#xff08;Value Converter&#xff09;是一种机制&#xff0c;允许你在绑定时转换绑定源和绑定目标之间的值。值转换器实现了 IValueConverter 接口&#xff0c;该接口…

【华为 ICT HCIA eNSP 习题汇总】——题目集12

1、企业网络内部常常采用私有 IP 地址进行通信&#xff0c;以下哪个地址属于私有 IP 地址&#xff1f; A、0.1.1.1 B、127.5.4.3 C、128.0.0.5 D、172.24.35.36 考点&#xff1a;网络层 解析&#xff1a;&#xff08;D&#xff09; A类 IP 地址中&#xff0c;10.0.0.0 ~ 10.255…

深入学习《大学计算机》系列之第1章 1.7节——图灵机的一个例子

一.欢迎来到我的酒馆 第1章 1.7节&#xff0c;图灵机的一个例子。 目录 一.欢迎来到我的酒馆二.图灵机2.1 艾伦-图灵简介2.2 图灵机简介 三.图灵机工作原理3.1 使用图灵机打印二进制数3.2 图灵机工作原理总结 四.总结 二.图灵机 本节内容主要介绍计算机科学之父——艾伦-图灵、…

Java学习-常用API-新增时间

1.学习JDK8新增时间的原因&#xff1f; 2.JDK8新增了那些时间&#xff1f; 代替calendar的 localDate localTime localDateTime 常用APi及代码示例&#xff1a; ZoneIdZonedDateTime 常用方法 代码示例&#xff1a; 代替Date的 Instant常见方法及其代码示例&#xff1a; 注…

权限提升:利用Linux漏洞提权

目录 Linux权限基础 Linux用户权限 Linux文件权限 特殊的Linux文件权限 Linux本机信息收集 利用Linux漏洞进行提权 脏牛漏洞 pkexec Linux权限基础 Linux用户权限 在Linux中&#xff0c;根据权限的不同&#xff0c;大致可以分为三种&#xff1a;超级用户&#xff08;…

C#,卢卡斯数(Lucas Number)的算法与源代码

1 卢卡斯数&#xff08;Lucas Number&#xff09; 卢卡斯数&#xff08;Lucas Number&#xff09;是一个以数学家爱德华卢卡斯&#xff08;Edward Lucas&#xff09;命名的整数序列。爱德华卢卡斯既研究了这个数列&#xff0c;也研究了有密切关系的斐波那契数&#xff08;两个…

今日早报 每日精选15条新闻简报 每天一分钟 知晓天下事 2月13日,星期二

每天一分钟&#xff0c;知晓天下事&#xff01; 2024年2月13日 星期二 农历正月初四 1、 春节假期旅游爆火&#xff01;多地景区宣布门票售罄&#xff0c;建议错峰错区游览。 2、 中国旅游研究院&#xff1a;预计2024年全年国内旅游人数将超过60亿人次。 3、 应急管理部&#…

Solidworks:从2D走向3D

Sokidworks 的强大之处在于三维实体建模&#xff0c;这个形状看似复杂&#xff0c;实际上只需要拉伸一次&#xff0c;再做一次减法拉伸就行了。第一次做三维模型&#xff0c;费了不少时间才搞明白。 接下来做一个稍微复杂一点的模型&#xff0c;和上面这个操作差不多&#xff0…

基于Seaborn和Matplotlib的可视化案例分析

处理数据有时会有点无聊。将原始数据转换为可理解的格式是整个过程中最重要的部分之一&#xff0c;那么为什么只停留在数字上&#xff0c;当我们可以将数据可视化为令人兴奋的图表时&#xff0c;这些图表可以在python中获取。这篇文章将重点探索耐人寻味的预处理之旅。 Seabor…

SSM+SpringBoot框架

单例bean是线程安全的吗 AOP Spring事务失效 第四种&#xff0c;在方法内部使用&#xff0c;需要用代理类调用此方法 bean生命周期 bean的循环依赖 SpringMVC执行流程 、 SpringBoot自动配置原理 Spring常见注解 MyBatis执行流程 MyBatis延迟加载 MyBatis缓存

最新在线看4K高清电影网站推荐

随着互联网技术的发展&#xff0c;观看高清电影已经不再是难事。这里我为大家分享几个最新的在线看4K高清电影网站&#xff0c;让您在家就能享受到极致观影体验。 通过下面这个即可 1. 【超清影视】 【超清影视】是国内新兴的4K高清电影网站&#xff0c;拥有海量的影片资源&a…

000——对i.MAX6uLL进行开箱检查

目录 00外观检查 01有线网卡检查 02USB-host测试 03耳机测试 04按键测试 05查看CPU温度 00外观检查 看起来没什么问题&#xff0c;买了半年了刚开始要做 01有线网卡检查 开机启动正常&#xff0c;韦东山老师还写了个欢迎界面&#xff0c;这个我后面因为要用于毕业设计&am…

《Linux 简易速速上手小册》第5章: 用户与群组管理(2024 最新版)

文章目录 5.1 管理用户账户5.1.1 重点基础知识5.1.2 重点案例&#xff1a;创建一个新的开发者账户5.1.3 拓展案例 1&#xff1a;禁用用户登录5.1.4 拓展案例 2&#xff1a;设置账户到期 5.2 群组概念与管理5.2.1 重点基础知识5.2.2 重点案例&#xff1a;为项目团队设置群组5.2.…

《Linux 简易速速上手小册》第3章: 文件系统与权限(2024 最新版)

文章目录 3.1 Linux 文件系统结构3.1.1 重点基础知识3.1.2 重点案例&#xff1a;设置一个 Web 服务器3.1.3 拓展案例 1&#xff1a;日志文件分析3.1.3 拓展案例 2&#xff1a;备份用户数据 3.2 理解文件权限3.2.1 重点基础知识3.2.2 重点案例&#xff1a;共享项目文件夹3.2.3 拓…

FPGA_简单工程_数码管静态显示

一 理论 数码管是一种半导体发光器件&#xff0c;基本单位是发光二极管。 以六位八段数码管为例&#xff0c;每段需要一个端口信号&#xff0c;6814位。 74HC595芯片&#xff1a; 8位串行输入&#xff0c;并行输出的位移缓存器&#xff0c;其内部具有8位移位寄存器和一个存储…

Javaweb之SpringBootWeb案例之 登录功能的详细解析

1. 登录功能 1.1 需求 在登录界面中&#xff0c;我们可以输入用户的用户名以及密码&#xff0c;然后点击 "登录" 按钮就要请求服务器&#xff0c;服务端判断用户输入的用户名或者密码是否正确。如果正确&#xff0c;则返回成功结果&#xff0c;前端跳转至系统首页面…

算法学习——LeetCode力扣二叉树篇4

算法学习——LeetCode力扣二叉树篇4 222. 完全二叉树的节点个数 222. 完全二叉树的节点个数 - 力扣&#xff08;LeetCode&#xff09; 描述 给你一棵 完全二叉树 的根节点 root &#xff0c;求出该树的节点个数。 完全二叉树 的定义如下&#xff1a;在完全二叉树中&#xf…

车载诊断协议DoIP系列 —— 协议中术语解释和定义

车载诊断协议DoIP系列 —— 协议中术语解释和定义 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师(Wechat:gongkenan2013)。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 本就是小人物,输了就是输了,不要在意别人怎么看自己。江湖一碗茶,…