IM系统设计之websocket消息转发

Websocket消息转发

项目地址:git@github.com:muyixiaoxi/Link.git

上周面试被面试官问到:“在分布式IM系统中,如何实现多个websocket集群之间的通信”。

我在思考了良久后回答:“不会”。

随着我的回答,我和面试官的故事也到此完结了…

为什么会出现websocket集群

在IM系统中,需要在服务端和客户端之间维持一个长连接,而这个长连接可以通过websocket实现。
但服务端能维持websocket的数量并不是无限的。

WebSocket的并发连接数受到多种因素的影响,其中最主要的瓶颈通常在于服务器资源。在传统的模型中,一台服务器上的最大WebSocket连接数受到操作系统中TCP/IP连接数的限制。在Linux系统中,每个IPv4地址允许的最大连接数为65535,这意味着如果每个连接都使用不同的IP地址,一台服务器最多只能维持65535个WebSocket连接。

当用户量很多时,一台websocket服务器远远是不够的,所以需要多台websocket服务器。

假如现在只有一台IM服务器(即 Websocket 服务器),用户A、用户B均在线,用户A向用户B发送一条消息
在这里插入图片描述
单台IM服务器发送消息大概流程如下:

  1. 客户端向IM服务端发送消息
  2. IM服务端收到消息判断用户B是否在线
    • 在线,Websocket转发
    • 离线,将消息存储到B的离线消息库
  3. 用户B立即了消息,或者在下次上线时收到了消息

因为现在只有一台IM服务器,所以直接可以判断用户B是否在线,并且转发。

假如现在我有多台IM服务器,重复上面的操作

如果恰巧A和B连接在一台IM服务器上,那么和上面的流程一样

假如现在A连接在IM1上,B连接在IM2上
在这里插入图片描述

其中红线的部分,就是我们要解决的部分

websocket集群通信

我查阅了网上的一些资料(这一块具体采用哪种技术栈实现,网上的资料很少),大概分为两种

方法一:互为客户端

分别将IM服务端与其他IM服务端连接起来,可以通过网络编程或者MQ来实现
在这里插入图片描述
优点:

  • 不需要额外的服务
  • 转发过程中,各个IM服务负载相对均匀

缺点:

  • 每增加一个IM服务端都需要其他服务端多维持一个连接或者MQ
  • 水平扩展有点繁琐

方法二:C/S

采用c/s架构,新建一个transmit服务,单独实现转发功能,可以通过网络编程或者MQ实现。
因为各个 IM 都与 transmit 连接,所以扩展只需要该配置文件的运行端口
在这里插入图片描述
优点:

  • 高可用,IM server扩展方便,只需要修改自己的运行端口

缺点:

  • 不同服务端之间的消息需要通过 transmit 转发,当海量消息时,对 transmit 压力比较大
代码实现

为了实现消息的时效性,以及高可用,我采用net包中的tcp实现了c/s架构
项目目录如下:

transmit:
│  client.go
│  main.go
│
├─common
│  └─proto
│          proto.go
│
└─typestypes.go
proto

使用 net 包的 tcp 可能会出现粘包现象,封装编码与解码方法从而避免粘包

package mainimport ("bufio""encoding/json""fmt""github.com/zeromicro/go-zero/core/logx""net""sync""transmit/common/proto""transmit/types"
)var Connects sync.Mapfunc main() {listenClient()
}// listenClient 监听
func listenClient() {lister, err := net.Listen("tcp", "127.0.0.1:8333")if err != nil {fmt.Println("net.Listen failed:", err)}for {conn, err := lister.Accept()if err != nil {continue}fmt.Println(conn.RemoteAddr().String())Connects.Swap(conn.RemoteAddr().String(), conn)go addReceiver(conn)}
}// addReceiver 向连接添加接收器
func addReceiver(conn net.Conn) {defer conn.Close()reader := bufio.NewReader(conn)for {m, err := proto.Decode(reader)if err != nil {fmt.Println("与客户端", conn.LocalAddr(), "断开连接")return}transmit := types.TransmitMap{}json.Unmarshal([]byte(m), &transmit)// 读到消息后,根据服务器进行转发for connect := range transmit.Users {transmitMessage(conn, connect, transmit)}}
}func transmitMessage(conn net.Conn, ip string, transmit types.TransmitMap) {c, ok := Connects.Load(ip)message := types.TransmitMap{Users: map[string][]uint64{},}if !ok {message = types.TransmitMap{Message: types.Message{Id:          "",From:        0,To:          0,Type:        100,ContentType: 0,Time:        "",Content:     "客户端离线",},}s, _ := json.Marshal(message)msg, _ := proto.Encode(string(s))conn.Write(msg)fmt.Println("客户端离线:", ip)logx.Error("connect ip offline:", ip)return}message.Users[ip] = transmit.Users[ip]message.Message = transmit.Messagej, _ := json.Marshal(message)msg, _ := proto.Encode(string(j))fmt.Println("ip:", ip, "msg:", string(msg))c.(net.Conn).Write(msg)
}
main

通过监听某个端口,让 IM server与其建立间接,实现转发功能

package mainimport ("bufio""encoding/json""fmt""github.com/zeromicro/go-zero/core/logx""net""sync""transmit/common/proto""transmit/types"
)var Connects sync.Mapfunc main() {listenClient()
}// listenClient 监听
func listenClient() {lister, err := net.Listen("tcp", "127.0.0.1:8333")if err != nil {fmt.Println("net.Listen failed:", err)}for {conn, err := lister.Accept()if err != nil {continue}fmt.Println(conn.RemoteAddr().String())Connects.Swap(conn.RemoteAddr().String(), conn)go addReceiver(conn)}
}// addReceiver 向连接添加接收器
func addReceiver(conn net.Conn) {defer conn.Close()reader := bufio.NewReader(conn)for {m, err := proto.Decode(reader)if err != nil {fmt.Println("与客户端", conn.LocalAddr(), "断开连接")return}transmit := types.TransmitMap{}json.Unmarshal([]byte(m), &transmit)// 读到消息后,根据服务器进行转发for connect := range transmit.Users {transmitMessage(conn, connect, transmit)}}
}func transmitMessage(conn net.Conn, ip string, transmit types.TransmitMap) {c, ok := Connects.Load(ip)message := types.TransmitMap{Users: map[string][]uint64{},}if !ok {message = types.TransmitMap{Message: types.Message{Id:          "",From:        0,To:          0,Type:        100,ContentType: 0,Time:        "",Content:     "客户端离线",},}s, _ := json.Marshal(message)msg, _ := proto.Encode(string(s))conn.Write(msg)fmt.Println("客户端离线:", ip)logx.Error("connect ip offline:", ip)return}message.Users[ip] = transmit.Users[ip]message.Message = transmit.Messagej, _ := json.Marshal(message)msg, _ := proto.Encode(string(j))fmt.Println("ip:", ip, "msg:", string(msg))c.(net.Conn).Write(msg)
}
client

模拟客户端,根据自己的项目拆分到 IM server中

package mainimport ("bufio""encoding/json""fmt""net""time""transmit/common/proto""transmit/types"
)func client() {conn, _ := InitConnect()go Consumer(conn)var ip stringmessage := types.Message{Id:          "123",From:        1,To:          2,Type:        1,ContentType: 1,Time:        "123",Content:     "你好",}for {fmt.Scan(&ip)users := map[string][]uint64{}users[ip] = []uint64{1, 2}time.Sleep(2 * time.Second)if err := Producer(conn, users, message); err != nil {fmt.Println("Producer(conn, ip, message) failed", err)}}}func InitConnect() (conn net.Conn, err error) {conn, err = net.Dial("tcp", "127.0.0.1:8333")fmt.Println(conn.LocalAddr())return
}func Producer(conn net.Conn, user map[string][]uint64, mes types.Message) (err error) {transmit := types.TransmitMap{Users:   user,Message: mes,}message, _ := json.Marshal(transmit)m, _ := proto.Encode(string(message))_, err = conn.Write(m)if err != nil {// 重试三次,一次休眠一秒for i := 0; i < 3 && err != nil; i++ {time.Sleep(1 * time.Second)_, err = conn.Write(m)}}return
}// Consumer 消费者 读消息
func Consumer(conn net.Conn) {defer conn.Close()reader := bufio.NewReader(conn)for {m, err := proto.Decode(reader)if err != nil {continue}transmit := types.TransmitMap{}json.Unmarshal([]byte(m), &transmit)// 读到消息后,进行转发for _, uIds := range transmit.Users {for _, id := range uIds {fmt.Println(id, transmit.Message)}}}
}
types

在转发群聊消息时,需要将 m 个用户转发到 n 个IM服务端上,如果单独发送需要多次发送,所以封装成 TransmitMap 进行转发。

type Message struct {Id          string `json:"id"`From        uint64 `json:"from,optional"`To          uint64 `json:"to"`Type        uint32 `json:"type"`ContentType uint32 `json:"contentType"`Time        string `json:"time"`Content     string `json:"content"`
}type TransmitMap struct {Users map[string][]uint64 `json:"users"`  // map[主机地址]用户集合Message
}

为什么这里要封装一个?

type TransmitMap struct {Users map[string][]uint64 `json:"users"`Message
}

比如用户A、B、C、D、E、F、G在同一个群聊里,各自连接到的 IM server 如图所示
在这里插入图片描述
如果群聊消息采用上面单聊的转发方式

  1. 用户A发送一条消息
  2. IM server1 收到群聊消息,查找群里的其他用户(B、C、D、E、F、G)
  3. 判断哪些用户在当前 IM servier 上,发现 B,直接转发
  4. 遍历(B、C、D、E、F、G): 将在线用户消息转发消息到 transmit

如果不做任何操作的化,光过程 4 就需要转发5次消息

用户A发送一条群聊消息的过程

  1. 用户A发送一条群聊消息
  2. IM server1 收到群聊消息,查找群里的其他用户(B、C、D、E、F、G)
  3. 判断哪些用户在当前 IM servier 上,发现 B,直接转发
  4. 通过 redis 判断哪些用户在线并获取主机地址,将通过map将相同地址的用户分类 map[主机地址]用户集合,一块转发到 transmit
  5. transmit 以主机地址为组,将消息发送给 IM server2 和 IM server3
  6. IM server2 和 IM server3 收到消息后,将消息进行转发
  7. 离线用户同步离线消息库

这样的好处是,有效的减少群聊消息转发的次数。

ps:如果存在哪些不足,欢迎大家在评论区指正~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/765797.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

企业产品网络安全建设实录日报规划

为什么要做这个系列&#xff1f; 因为太忙了&#xff0c;忙工作&#xff0c;忙家庭。很难再像之前一样能做到电脑前码一个小时的字&#xff0c;写半个小时的博客。 但是现在的经验增长快速&#xff0c;有很多值得分享的点 想想这平行世界里的我&#xff0c;实际上并不是每一个人…

Spring Data访问Elasticsearch----创建存储库实例

Spring Data访问Elasticsearch----创建存储库实例 一、Java配置二、XML配置三、使用过滤器四、独立使用 本文介绍如何为已定义的存储库接口创建实例和bean定义。 一、Java配置 在Java配置类上使用特定于存储的EnableElasticsearchRepositories注解来定义用于存储库激活的配置。…

MySQL 中的事务和存储引擎

目录 事务的 ACID 特性 MySQL 的四种隔离机制和问题 MySQL 的四种隔离机制&#xff1a; MySQL 的存储引擎 InnoDB 存储引擎 MyISAM 存储引擎 Memory 存储引擎 通过 ALTER TABLE 语句更改存储引擎 在创建表时指定存储引擎 通过修改配置文件设置默认存储引擎 在数据库系…

element el-dialog里再调用其他组件,查找不到组件的方法

需求描述&#xff1a;点击编辑按钮&#xff0c;跳出编辑弹窗&#xff0c;回显图片组件里面的图片问题&#xff1a;element el-dialog里再调用组件&#xff0c;打开该弹窗的瞬间找不到弹窗里调用子组件的方法原因&#xff1a;弹窗显示时&#xff0c;调用的子组件还没渲染出来所以…

【机器学习入门 】支持向量机

系列文章目录 第1章 专家系统 第2章 决策树 第3章 神经元和感知机 识别手写数字——感知机 第4章 线性回归 第5章 逻辑斯蒂回归和分类 前言 支持向量机(Support Vector Machine) 于1995年发表&#xff0c;由于其优越的性能和广泛的适用性&#xff0c;成为机器学习的主流技术&…

阿里云有免费服务器吗?有的,附送免费服务器申请流程

阿里云服务器免费试用申请链接入口&#xff1a;aliyunfuwuqi.com/go/free 阿里云个人用户和企业用户均可申请免费试用&#xff0c;最高可以免费使用3个月&#xff0c;阿里云服务器网分享阿里云服务器免费试用申请入口链接及云服务器配置&#xff1a; 阿里云免费服务器领取 阿里…

Python:继承

注意&#xff1a;本文引用自专业人工智能社区Venus AI 更多AI知识请参考原站 &#xff08;[www.aideeplearning.cn]&#xff09; Python中的继承是面向对象编程&#xff08;OOP&#xff09;的一个基本特性&#xff0c;它允许一个类&#xff08;称为子类&#xff09;继承另一个…

string c++

#include<iostream> using namespace std; int main() { string a"wddw"; string b"wdwdwdwdwd"; string c b a; cout << c << endl; cout << c.length() << endl; string c_sub c.substr(2,2);//从第二个开…

day10_面向对象之封装丶构造器

封装概述 现实生活中&#xff0c;每一个个体与个体之间是有边界的&#xff0c;每一个团体与团体之间是有边界的&#xff0c;而同一个个体、团体内部的信息是互通的&#xff0c;只是对外有所隐瞒。 面向对象编程语言是对客观世界的模拟&#xff0c;客观世界里每一个事物的内部…

【MySQL】巧解客户连续递增交易

力扣题 1、题目地址 2701. 连续递增交易 2、模拟表 表&#xff1a;Transactions 字段名类型transaction_idintcustomer_idinttransaction_datedateamountint transaction_id 是该表的主键。每行包含有关交易的信息&#xff0c;包括唯一的 (customer_id, transaction_date…

总结: HQL语句

总结: HQL语句 Part1 数据库的操作Part2 数据表的操作1. 创建普通表2. 内外部表3. 内外部表转换 Part1 数据库的操作 查看数据库: show databases; 创建数据库: create database if not exists 数据库名 使用数据库: use 数据库名; 查看数据库详细信息: desc database 数据库名…

Echarts 利用多X轴实现未来15天天气预报

Echarts 利用多X轴实现未来15天天气预报 UI 设计图 Echarts 实现效果 代码实现 代码分解 echarts 图表上下均显示数据 通过设置 grid.top 和 grid.bottom 设置白天和夜间天气展示区域 grid: {top: 36%,bottom: 36%,left: 5%,right: 5%}, 天气图标的设置 由于 axisLabel 的…

【Linux】一文了解【进程优先级相关知识点】&【PRI / NI值】背后的修正原理(13)

前言 大家好吖&#xff0c;欢迎来到 YY 滴Linux系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过Linux的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的《…

算法 之 排序算法

&#x1f389;欢迎大家观看AUGENSTERN_dc的文章(o゜▽゜)o☆✨✨ &#x1f389;感谢各位读者在百忙之中抽出时间来垂阅我的文章&#xff0c;我会尽我所能向的大家分享我的知识和经验&#x1f4d6; &#x1f389;希望我们在一篇篇的文章中能够共同进步&#xff01;&#xff01;&…

实验7-2-10 简易连连看(PTA)

题目&#xff1a; 本题要求实现一个简易连连看游戏模拟程序。 给定一个2N2N的方阵网格游戏盘面&#xff0c;每个格子中放置一些符号。这些符号一定是成对出现的&#xff0c;同一个符号可能不止一对。程序读入玩家给出的一对位置(x1​,y1​)、(x2​,y2​)&#xff0c;判断这两…

leetcode(Hot100)——数组篇

1、两数之和 本题使用哈希法&#xff0c;用一个哈希Map保存数组的值以及对应下标&#xff0c;代码如下&#xff1a; class Solution {public int[] twoSum(int[] nums, int target) {HashMap<Integer,Integer> map new HashMap<>();for(int i0; i<nums.length…

【网络基础】网络层基本协议介绍

目录 一、IP数据包 1.1 网络层的功能 1.2 IP数据包格式 二、ICMP协议介绍 2.1 作用 2.2 常用命令 2.2.1 Ping命令 2.2.2 tracert命令 2.3 广播域 三、ARP协议介绍 3.1 作用 3.2 原理 一、IP数据包 1.1 网络层的功能 定义了基于IP协议的逻辑地址&#xff0c;就是I…

数据结构应用——哈夫曼树

哈夫曼树 哈夫曼树的相关概念构造哈夫曼树基础算法 哈夫曼编码 哈夫曼树的相关概念 结点的权&#xff1a;有某种现实含义的数值。结点的带权路径长度&#xff1a;从树的根结点到该结点的路径长度&#xff08;经过的边数&#xff09;与该结点上权值的乘积。树的带权路径长度&am…

CAPL (Communication Access Programming Language)

CAPL (Communication Access Programming Language) 是一种专门用于模拟和测试汽车网络的脚本语言&#xff0c;特别是CAN (Controller Area Network) 和LIN (Local Interconnect Network) 网络。虽然CAPL主要用于模拟网络行为和测试网络节点&#xff0c;但它也支持一些基本的编…

XiBe希贝奶瓶好用吗?2名宝宝的宝爸深度测评分享!

几乎每个新手宝爸宝妈都会有一段时间对孩子的很多东西都是不懂的&#xff0c;一边摸索一边学习。列如关于奶瓶这个问题就困扰不少新手爸妈&#xff0c;特别是面对这么多的品牌的奶瓶完全不知道怎么选。 相信很多新手爸妈都十分担心奶瓶材质安全问题&#xff0c;所以我作为一名…