关于ASP.NET Core WebSocket实现集群的思考

前言

    提到WebSocket相信大家都听说过,它的初衷是为了解决客户端浏览器与服务端进行双向通信,是在单个TCP连接上进行全双工通讯的协议。在没有WebSocket之前只能通过浏览器到服务端的请求应答模式比如轮询,来实现服务端的变更响应到客户端,现在服务端也可以主动发送数据到客户端浏览器。WebSocket协议和Http协议平行,都属于TCP/IP四层模型中的第四层应用层。由于WebSocket握手阶段采用HTTP协议,所以也需要进行跨域处理。它的协议标识是wswss对应了常规标识和安全通信协议标识。本文重点并不是介绍WebSocket协议相关,而是提供一种基于ASP.NET Core原生WebSocket的方式实现集群的实现思路。关于这套思路其实很早之前我就构思过了,只是之前一直没有系统的整理出来,本篇文章就来和大家分享一下,由于主要是提供一种思路,所以涉及到具体细节或者业务相关的可能没有体现出来,还望大家理解。

实现

咱们的重点关键字就是两个WebSocket集群,实现的框架便是基于ASP.NET Core,我也基于golang实现了一套,本文涉及到的相关源码和golang版本的实现都已上传至我的github[1],具体仓库地址可以转到文末自行跳转到示例源码[2]中查看。既然涉及到集群,这里咱们就用nginx作为反向代理,来搭建一个集群实例。大致的示例结构如下图所示

980d0a5c81d7e9969efaee70e43b2b6d.png

redis在这里扮演的角色呢,是用来处理Server端的消息相互传递用的,主要是使用的redis的pub/sub功能来实现的,这里便涉及到几个核心问题

  • • 首先,集群状态每个用户被分发到具体的哪台服务器上是不得而知的

  • • 其次,处在不同Server端的不同用户间的相互通信是需要一个传递媒介

  • • 最后,针对不同的场景比如单发消息、分组消息、全部通知等要有不同的处理策略

这里需要考虑的是,如果需要搭建实时通信服务器的话,需要注意集群的隔离性,主要是和核心业务进行隔离,毕竟WebSocket需要保持长链接、且消息的大小需要评估。

上面提到了redis的主要功能就是用来传递消息用的,毕竟每个server服务器是无状态的。这当然不是必须的,任何可以进行消息分发的中间件都可以,比如消息队列rabbitmq、kafka、rocketmq、mqtt等,甚至只要能把要处理的消息存储起来都可以比如缓存甚至是关系型数据库等等。这压力使用redis主要是因为操作起来简单、轻量级、灵活,让大家关注点在思路上,而不是使用中案件的代码上。

nginx配置

通过上面的图我们可以看到,我们这里构建集群示例使用的nginx,如果让nginx支持WebSocket的话,需要额外的配置,这个在网上有很多相关的文章介绍,这里就来列一下咱们示例的nginx配置,在配置文件nginx.conf

//上游服务器地址也就是websocket服务的真实地址
upstream wsbackend {server 127.0.0.1:5001;server 127.0.0.1:5678;
}server {listen       5000;server_name  localhost;location ~/chat/{//upstream地址proxy_pass http://wsbackend;proxy_connect_timeout 60s; proxy_read_timeout 3600s;proxy_send_timeout 3600s;//记得转发避免踩坑proxy_set_header Host $host;proxy_http_version 1.1; //http升级成websocket协议的头标识proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "Upgrade";}
}

这套配置呢,在搜索引擎上能收到很多,不过不妨碍我把使用的粘贴出来。这一套亲测有效,也是我使用的配置,请放心使用。个人认为如果是线上环境采用的负载均衡策略可以选择ip_hash的方式,保证同一个ip的客户端用户可以分发到一台WebSocket实例中去,这样的话能尽量避免使用redis的用户频道做消息传递。好了,接下来准备开始展示具体实现的代码了。

一对一发送

首先介绍的就是一对一发送的情况,也就是我把消息发给你,聊天的时候私聊的情况。这里呢涉及到两种情况

  • • 如果你需要通信的客户端和你连接在一个Server端里,这样的话可以直接在链接里找到这个端的通信实例直接发送。

  • • 如果你需要通信的客户端和你不在一个Server端里,这个时候咱们就需要借助redis的pub/sub的功能,把消息传递给另一个Server端。

咱们通过一张图大致的展示一下它的工作方式

40018a44e7c3c09ffc4ea8e21dd726e2.png

解释一下,每个客户端注册到WebSocket服务里的时候会在redis里订阅一个user:用户唯一标识的频道,这个频道用于接收和当前WebSocket连接不在一个服务端的其他WebSocket发送过来的消息。每次发送消息的时候你会知道你要发送给谁,不在当前服务器的话则发送到redis的user:用户唯一标识频道,这样的话目标WebSocket就能收到消息了。首先是注入相关的依赖项,这里我使用的redis客户端是freeredis,主要是因为操作起来简单,具体实现代码如下

var builder = WebApplication.CreateBuilder(args);
//注册freeredis
builder.Services.AddSingleton(provider => {var logger = provider.GetService<ILogger<WebSocketChannelHandler>>();RedisClient cli = new RedisClient("127.0.0.1:6379");cli.Notice += (s, e) => logger?.LogInformation(e.Log);return cli;
});
//注册WebSocket具体操作的类
builder.Services.AddSingleton<WebSocketHandler>();
builder.Services.AddControllers();var app = builder.Build();var webSocketOptions = new WebSocketOptions
{KeepAliveInterval = TimeSpan.FromMinutes(2)
};
//注册WebSocket中间件
app.UseWebSockets(webSocketOptions);app.MapGet("/", () => "Hello World!");
app.MapControllers();app.Run();

接下来我们定义一个Controller用来处理WebSocket请求

public class WebSocketController : ControllerBase
{private readonly ILogger<WebSocketController> _logger;private readonly WebSocketHandler _socketHandler;public WebSocketController(ILogger<WebSocketController> logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler){_logger = logger;_socketHandler = socketHandler;}//这里的id代表当前连接的客户端唯一标识比如用户唯一标识[HttpGet("/chat/user/{id}")]public async Task ChatUser(string id){//判断是否是WebSocket请求if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//处理请求相关await _socketHandler.Handle(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}
}

这里的WebSocketHandler是用来处理具体逻辑用的,咱们看一下相关代码

public class WebSocketHandler:IDisposable
{//存储当前服务用户的集合private readonly UserConnection UserConnection = new();//redis频道前缀private readonly string userPrefix = "user:";//用户对应的redis频道private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly ILogger<WebSocketHandler> _logger;//redis客户端private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task Handle(string id, WebSocket webSocket){//把当前用户连接存储起来_ = UserConnection.GetOrAdd(id, webSocket);//订阅一个当前用户的频道await SubMsg($"{userPrefix}{id}");var buffer = new byte[1024 * 4];//接收发送过来的消息,这个方法是阻塞的,如果没收到消息则一直阻塞var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//循环接收消息while (webSocket.State == WebSocketState.Open){try{//因为缓冲区长度是固定的所以要获取实际长度string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');//接收的到消息转换成实体MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(msg);//发送到其他客户端的数据byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标客户端是否在当前当前服务,如果在当前服务直接扎到目标连接直接发送if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果要发送的目标端不在当前服务,则发送给目标redis端的频道ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };//目标的redis频道_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}//继续阻塞循环接收消息receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//循环结束意味着当前端已经退出//从当前用户的集合移除当前用户_ = UserConnection.TryRemove(id, out _);//关闭当前WebSocket连接await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在当前订阅集合移除当前用户_disposables.TryRemove($"{userPrefix}{id}", out var disposable);//关闭当前用户的通道disposable.Dispose();}private async Task SubMsg(string channel){//订阅当前用户频道var sub = _redisClient.Subscribe(channel,  async (channel, data) => {//接收过来当前频道数据,说明发送端不在当前服务ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");//在当前服务找到目标的WebSocket连接并发送消息if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把redis订阅频道添加到集合中_disposables.TryAdd(channel, sub);}//程序退出的时候取消当前服务订阅的redis频道public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}
}

这里涉及到几个辅助相关的类,其中UserConnection类是存储注册到当前服务的连接,MsgBody类用来接受客户端发送过来的消息,ChannelMsgBody是用来发送redis频道的相关消息,因为要把相关消息通过redis发布出去,咱们列一下这几个类的相关代码

//注册到当前服务的连接
public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>>
{//存储用户唯一标识和WebSocket的对应关系private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>();//当前服务的用户数量public int Count => _users.Count;public WebSocket GetOrAdd(string userId, WebSocket webSocket){return _users.GetOrAdd(userId, webSocket);}public bool TryGetValue(string userId, out WebSocket webSocket){return _users.TryGetValue(userId, out webSocket);}public bool TryRemove(string userId, out WebSocket webSocket){return _users.TryRemove(userId, out webSocket);}public void Clear(){_users.Clear();}public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator(){return _users.GetEnumerator();}IEnumerator IEnumerable.GetEnumerator(){return this.GetEnumerator();}
}//客户端消息
public class MsgBody
{//目标用户标识public string Id { get; set; }//要发送的消息public string Msg { get; set; }
}//频道订阅消息
public class ChannelMsgBody
{//用户标识public string FromId { get; set; }//目标用户标识,也就是要发送给谁public string ToId { get; set; }//要发送的消息public string Msg { get; set; }
}

这样的话关于一对一发送消息的相关逻辑就实现完成了,启动两个Server端,由于nginx默认的负载均衡策略是轮询,所以注册两个用户的话会被分发到不同的服务里去

9ea4d34f88c6d8c0e3b187baea1a9624.pngadd699c29c6c7017f63d2a3c75b6f6f6.png

Postman连接三个连接唯一标识分别是1、2、3,模拟一下消息发送,效果如下,发送效果

5179738c091932c89b25c2aaa5d07361.png

接收效果

2b915faad16a5d094c2385b3c5fb9d36.png

群组发送

上面我们展示了一对一发送的情况,接下来我们来看一下,群组发送的情况。群组发送的话就是只要大家都加入一个群组,只要客户端在群组里发送一条消息,则注册到当前群组内的所有客户端都可以收到消息。相对于一对一的情况就是如果当前WebSocket服务端如果存在用户加入某个群组,则当前当前WebSocket服务端则可以订阅一个group:群组唯一标识的redis频道,集群中的其他WebSocket服务器通过这个redis频道接收群组消息,通过一张图描述一下

3fa40aa436d8f3a05fa637a35ac54038.png

群组的实现方式相对于一对一要简单一点

  • • 发送端可以不用考虑当前服务中的客户端连接,一股脑的交给redis把消息发布出去

  • • 如果有WebSocket服务中的用户订阅了当前分组则可以接受消息,获取组内的用户循环发送消息

展示一下代码实现的方式,首先是定义一个action用于表示群组的相关场景

//包含两个标识一个是组别标识一个是注册到组别的用户
[HttpGet("/chat/group/{groupId}/{userId}")]
public async Task ChatGroup(string groupId, string userId)
{if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//调用HandleGroup处理群组相关的消息await _socketHandler.HandleGroup(groupId, userId, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}
}

接下来看一下HandleGroup的相关逻辑,还是在WebSocketHandler类中,看一下代码实现

public class WebSocketHandler:IDisposable
{private readonly UserConnection UserConnection = new();private readonly GroupUser GroupUser = new();private readonly SemaphoreSlim _lock = new(1, 1);private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly string groupPrefix = "group:";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleGroup(string groupId, string userId, WebSocket webSocket){//因为群组的集合可能会存在很多用户一起访问所以限制访问数量await _lock.WaitAsync();//初始化群组容器 群唯一标识为key 群员容器为valuevar currentGroup = GroupUser.Groups.GetOrAdd(groupId, new UserConnection { });//当前用户加入当前群组_ = currentGroup.GetOrAdd(userId, webSocket);//只有有当前WebSocket服务的第一个加入当前组的时候才去订阅群组频道//如果不限制的话则会出现如果当前WebSocket服务有多个用户在一个组内则会重复收到redis消息if (currentGroup.Count == 1){//订阅redis频道await SubGroupMsg($"{groupPrefix}{groupId}");}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收WebSocket消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//服务不退出的话则一直等待接收while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msg}");//组装redis频道发布的消息,目标为群组标识ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msg };//通过redis发布消息_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//如果客户端退出则在当前群组集合删除当前用户_ = currentGroup.TryRemove(userId, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubGroupMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) => {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//在当前WebSocket服务器找到当前群组里的用户GroupUser.Groups.TryGetValue(msgBody.ToId, out var currentGroup);//循环当前WebSocket服务器里的用户发送消息foreach (var user in currentGroup){//不用给自己发送了if (user.Key == msgBody.FromId){continue;}if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把当前频道加入订阅集合_disposables.TryAdd(channel, sub);}
}

这里涉及到了GroupUser类,是来存储群组和群组用户的对应关系的,定义如下

public class GroupUser
{//key为群组的唯一标识public ConcurrentDictionary<string, UserConnection> Groups = new ConcurrentDictionary<string, UserConnection>();
}

演示一下把两个用户添加到一个群组内,然后发送接收消息的场景,用户u1发送

1218ff21694a66c60a835925a98fca7c.png

用户u2接收

ecf82958a6be8fcf28809ab7373ec8fb.png

发送所有人

发送给所有用户的逻辑比较简单,不用考虑到用户限制,只要用户连接到了WebSocket集群则都可以接收到这个消息,大致工作方式如下图所示

5c07ac71c55eea1b4190b5e39753aee6.png

这个比较简单,咱们直接看实现代码,首先是定义一个地址,用于发布消息

//把用户注册进去
[HttpGet("/chat/all/{id}")]
public async Task ChatAll(string id)
{if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _socketHandler.HandleAll(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}
}

具体的实现逻辑还是在HandleGroup类里,是HandleAll方法,看一下具体实现

public class WebSocketHandler:IDisposable
{private readonly UserConnection AllConnection = new();private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly string all = "all";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleAll(string id, WebSocket webSocket){await _lock.WaitAsync();//把用户加入用户集合_ = AllConnection.GetOrAdd(id, webSocket);//WebSocket集群中的每个服务只定义一次if (AllConnection.Count == 1){await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收信息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');_logger.LogInformation($"user {id} send:{msg}");//获取接收信息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msg };//把消息通过redis发布到集群中的其他服务_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//用户退出则删除集合中的当前用户信息_ = AllConnection.TryRemove(id, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubAllMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) => {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//接收到消息后遍历用户集合把消息发送给所有用户foreach (var user in AllConnection){   //如果包含当前用户跳过if (user.Key == msgBody.FromId){continue;}if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});_disposables.TryAdd(channel, sub);}
}

效果在这里就不展示了,和群组的效果是类似的,只是一个是部分用户,一个是全部的用户。

整合到一起

上面我们分别展示了一对一、群组、所有人的场景,但是实际使用的时候,每个用户只需要注册到WebSocket集群一次也就是保持一个连接即可,而不是一对一一个连接、注册群组一个连接、所有消息的时候一个连接。所以我们需要把上面的演示整合一下,一个用户只需要连接到WebSocket集群一次即可,至于发送给谁,加入什么群组,接收全部消息等都是连接后通过一些标识区分的,而不必每个类型的操作都注册一次,就和微信和QQ一样我只要登录了即可,至于其他操作都是靠数据标识区分的。接下来咱们就整合一下代码达到这个效果,大致的思路是

  • • 用户连接到WebSocket集群,把用户和连接保存到当前WebSocket服务器的用户集合中去。

  • • 一对一发送的时候,只需要在具体的服务器中找到具体的客户端发送消息

  • • 群组的时候,先把当前用户标识加入群组集合即可,接收消息的时候根据群组集合里的用户标识去用户集合里去拿具体的WebSocket连接发送消息

  • • 全员消息的时候,直接遍历集群中的每个WebSocket服务里的用户集合里的WebSocket连接训话发送消息

这样的话就保证了每个客户端用户在集群中只会绑定一个连接,首先还是单独定义一个action,用于让客户端用户连接上来,具体实现代码如下所示

public class WebSocketChannelController : ControllerBase
{private readonly ILogger<WebSocketController> _logger;private readonly WebSocketChannelHandler _webSocketChannelHandler;public WebSocketChannelController(ILogger<WebSocketController> logger, WebSocketChannelHandler webSocketChannelHandler){_logger = logger;_webSocketChannelHandler = webSocketChannelHandler;}//只需要把当前用户连接到服务即可[HttpGet("/chat/channel/{id}")]public async Task Channel(string id){if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _webSocketChannelHandler.HandleChannel(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}
}

接下来看一下WebSocketChannelHandler类的HandleChannel方法实现,用于处理不同的消息,比如一对一、群组、全员消息等不同类型的消息

public class WebSocketChannelHandler : IDisposable
{//用于存储当前WebSocket服务器链接上来的所有用户对应关系private readonly UserConnection UserConnection = new();//用于存储群组和用户关系,用户集合采用HashSet保证每个用户只加入一个群组一次private readonly ConcurrentDictionary<string, HashSet<string>> GroupUser = new ConcurrentDictionary<string, HashSet<string>>();private readonly SemaphoreSlim _lock = new(1, 1);//存放redis订阅实例private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();//一对一redis频道前缀private readonly string userPrefix = "user:";//群组redis频道前缀private readonly string groupPrefix = "group:";//全员redis频道private readonly string all = "all";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketChannelHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleChannel(string id, WebSocket webSocket){await _lock.WaitAsync();//每次连接进来就添加到用户集合_ = UserConnection.GetOrAdd(id, webSocket);//每个WebSocket服务实例只需要订阅一次全员消息频道await SubMsg($"{userPrefix}{id}");if (UserConnection.Count == 1){await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//接收客户端消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');//读取客户端消息ChannelData channelData = JsonConvert.DeserializeObject<ChannelData>(msg);//判断消息类型switch (channelData.Method){//一对一case "One":await HandleOne(id, channelData.MsgBody, receiveResult);break;//把用户加入群组case "UserGroup":await AddUserGroup(id, channelData.Group, webSocket);break;//处理群组消息case "Group":await HandleGroup(channelData.Group, id, webSocket, channelData.MsgBody);break;//处理全员消息default:await HandleAll(id, channelData.MsgBody);break;}receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在群组中移除当前用户foreach (var users in GroupUser.Values){lock (users){users.Remove(id);}}//当前客户端用户退出则移除连接_ = UserConnection.TryRemove(id, out _);//取消用户频道订阅_disposables.Remove($"{userPrefix}{id}", out var sub);sub?.Dispose();}public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}
}

这里涉及到了ChannelData类是用于接收客户端消息的类模板,具体定义如下

public class ChannelData
{//消息类型 比如一对一 群组 全员public string Method { get; set; }//群组标识public string Group { get; set; }//消息体public object MsgBody { get; set; }
}

类中并不会包含当前用户信息,因为连接到当前服务的时候已经提供了客户端唯一标识。结合上面的处理代码我们可以看出,客户端用户连接到WebSocket实例之后,先注册当前用户的redis订阅频道并且当前实例仅注册一次全员消息的redis频道,用于处理非当前实例注册客户端的一对一消息处理和全员消息处理,然后等待接收客户端消息,根据客户端消息的消息类型来判断是进行一对一、群组、或者全员的消息类型处理,它的工作流程入下图所示

1abede83115fd260a7d42ac4de48f4cf.png

由代码和上面的流程图可知,它根据不同的标识去处理不同类型的消息,接下来我们可以看下每种消息类型的处理方式。

一对一处理

首先是一对一的消息处理情况,看一下具体的处理逻辑,首先是一对一发布消息

private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult){MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg));byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标用户是否在当前WebSocket服务器if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果不在当前服务器,则直接把消息发布到具体的用户频道去,由具体用户去订阅ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}
}

接下来是用于处理订阅其他用户发送过来消息的逻辑,这个和整合之前的逻辑是一致的,在当前服务器中找到用户对应的连接,发送消息

private async Task SubMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(msgBody.FromId, out _);}}});//把订阅实例加入集合_disposables.TryAdd(channel, sub);
}

如果给某个用户发送消息则可以使用如下的消息格式

{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}

Method为One代表着是私聊一对一的情况,消息体内Id为要发送给的具体用户标识和消息体。

群组处理

接下来看群组处理方式,这个和之前的逻辑是有出入的,首先是用户要先加入到某个群组然后才能接收群组消息或者在群组中发送消息,之前是一个用户对应多个连接,整合了之后集群中每个用户只关联唯一的一个WebSocket连接,首先看用户加入群组的逻辑

private async Task AddUserGroup(string user, string group, WebSocket webSocket)
{//获取群组信息var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>());lock (currentGroup){//把用户标识加入当前组_ = currentGroup.Add(user);}//每个组的redis频道,在每台WebSocket服务器实例只注册一次订阅if (currentGroup.Count == 1){//订阅当前组消息await SubGroupMsg($"{groupPrefix}{group}");}string addMsg = $"user 【{user}】 add  to group 【{group}】";byte[] sendByte = Encoding.UTF8.GetBytes(addMsg);await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);//如果有用户加入群组,则通知其他群成员ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg };_redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody));
}

用户想要在群组内发消息,则必须先加入到一个具体的群组内,具体的加入群组的格式如下

{"Method":"UserGroup", "Group":"g1"}

Method为UserGroup代表着用户加入群组的业务类型,Group代表着你要加入的群组唯一标识。接下来就看下,用户发送群组消息的逻辑了

private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody)
{//判断群组是否存在var hasValue = GroupUser.TryGetValue(groupId, out var users);if (!hasValue){byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}//只有加入到当前群组,才能在群组内发送消息if (!users.Contains(userId)){byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}");//发送群组消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() };_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));
}

加入群组之后则可以发送和接收群组内的消息了,给群组发送消息的格式如下

{"Method":"Group", "Group":"g1", "MsgBody":"Hi All"}

Method为Group代表着用户加入群组的业务类型,Group则代表你要发送到具体的群组的唯一标识,MsgBody则是发送到群组内的消息。最后再来看下订阅群组内消息的情况,也就是处理群组消息的逻辑

private async Task SubGroupMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{//接收群组订阅消息ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//获取当前服务器实例中当前群组的所有用户连接GroupUser.TryGetValue(msgBody.ToId, out var currentGroup);foreach (var user in currentGroup){if (user == msgBody.FromId){continue;}//通过群组内的用户标识去用户集合获取用户集合里的用户唯一连接发送消息if (UserConnection.TryGetValue(user, out var targetSocket) && targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{currentGroup.Remove(user);}}});_disposables.TryAdd(channel, sub);
}

全员消息处理

全员消息处理相对来说思路比较简单,因为当服务启动的时候就会监听redis的全员消息频道,这样的话具体的实现也就只包含发送和接收全员消息了,首先看一下全员消息发送的逻辑

private async Task HandleAll(string id, object msgBody)
{_logger.LogInformation($"user {id} send:{msgBody}");//直接给redis的全员频道发送消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msgBody.ToString() };_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));
}

全员消息的发送数据格式如下所示

{"Method":"All", "MsgBody":"Hello All"}

Method为All代表着全员消息类型,MsgBody则代表着具体消息。接收消息出里同样很简单,订阅redis全员消息频道,然后遍历当前WebSocket服务器实例内的所有用户获取连接发送消息,具体逻辑如下

private async Task SubAllMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//获取当前服务器实例内所有用户的连接foreach (var user in UserConnection){//不给自己发送消息,因为发送的时候可以通过具体的业务代码处理if (user.Key == msgBody.FromId){continue;}//给每个用户发送消息if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(user.Key, out _);}}});_disposables.TryAdd(channel, sub);
}

示例源码

由于篇幅有限,没办法设计到全部的相关源码,因此在这里贴出来github相关的地址,方便大家查看和运行源码。相关的源码我这里实现了两个版本,一个是基于asp.net core的版本,一个是基于golang的版本。两份源码的实现思路是一致的,所以这两份代码可以运行在一套集群示例里,配置在一套nginx里,并且连接到同一个redis实例里即可

  • • asp.net core源码示例 WebsocketCluster[3]

  • • golang源码示例 websocket-cluster[4]

仓库里还涉及到本人闲暇之余开源的其他仓库,由于本人能力有限难登大雅之堂,就不做广告了,有兴趣的同学可以自行浏览一下。

总结

    本文基于ASP.NET Core框架提供了一个基于WebSocket做集群的示例,由于思想是通用的,所以基于这个思路楼主也实现了golang版本。其实在之前就想自己动手搞一搞关于WebSocket集群方面的设计,本篇文章算是对之前想法的一个落地操作。其核心思路文章已经做了相关介绍,由于这些只是博主关于构思的实现,可能有很多细节尚未体现到,还希望大家多多理解。其核心思路总结一下

  • • 首先是,利用可以构建WebSocket服务的框架,在当前服务实例中保存当前客户端用户和WebSocket的连接关系

  • • 如果消息的目标客户端不在当前服务器,可以利用redis频道、消息队列相关、甚至是数据库类的共享回话发送的消息,由目标服务器获取目标是否属于自己的ws会话

  • • 本文设计的思路使用的是无状态的方式,即WebSocket服务实例之间不存在直接的消息通信和相互的服务地址存储,当然也可以利用redis等存储在线用户信息等,这个可以参考具体业务自行设计

读万卷书,行万里路。在这个时刻都在变化点的环境里,唯有不断的进化自己,多接触多尝试不用的事物,多扩展自己的认知思维,方能构建自己的底层逻辑。毕竟越底层越抽象,越通用越抽象。面对未知的挑战,自身作为自己坚强的后盾,可能才会让自己更踏实。

引用链接

[1] 我的github: https://github.com/softlgl
[2] 示例源码: #示例源码
[3] WebsocketCluster: https://github.com/softlgl/WebsocketCluster
[4] websocket-cluster: https://github.com/softlgl/websocket-cluster

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/281250.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

更快,更强的.NET 7 发布了

.NET Conf 2022 在昨晚(11⽉8⽇) 11 点 正式开始了&#xff0c;为期三天的会议&#xff08;11⽉8-10⽇&#xff09;&#xff0c; 围绕 .NET 7 展开。相信各位⼩伙伴都已经开始安装 .NET 7 正式版本还有以及相关的开发⼯具。这次 .NET 7 围绕传统的 C# &#xff0c;ASP.NET Core…

jvm(Java virtual machine) JVM架构解释

2019独角兽企业重金招聘Python工程师标准>>> JVM 架构解释 每个Java开发者都知道通过JRE【Java运行环境】执行字节码。 但是很多人都不知道JRE是JVM实现的事实。JVM负责执行字节码的分析 代码的解释和运行。 我们应该了解JVM的架构&#xff0c;这对开发者来说是很重…

WinForm(十五)窗体间通信

在很多WinForm的程序中&#xff0c;会有客户端之间相互通信的需求&#xff0c;或服务端与客户端通信的需求&#xff0c;这时就要用到TCP/IP的功能。在.NET中&#xff0c;主要是通过Socket来完成的&#xff0c;下面的例子是通过一个TcpListerner作为监听&#xff0c;等待TcpClie…

关于Java开发需要注意的十二点流程

1.将一些需要变动的配置写在属性文件中 比如&#xff0c;没有把一些需要并发执行时使用的线程数设置成可在属性文件中配置。那么你的程序无论在DEV环境中&#xff0c;还是TEST环境中&#xff0c;都可以顺畅无阻地运行&#xff0c;但是一旦部署在PROD上&#xff0c;把它作为多线…

Unity经典游戏教程之:雪人兄弟

版权声明&#xff1a; 本文原创发布于博客园"优梦创客"的博客空间&#xff08;网址&#xff1a;http://www.cnblogs.com/raymondking123/&#xff09;以及微信公众号"优梦创客"&#xff08;微信号&#xff1a;unitymaker&#xff09;您可以自由转载&#x…

一款自用的翻译小工具,开源了

一款自用的翻译小工具&#xff0c;开源了TranslationTool作者&#xff1a;WPFDevelopersOrg - 唐宋元明清|驚鏵原文链接&#xff1a;https://github.com/Kybs0/TranslationTool此项目使用WPF MVVM开发。框架使用大于等于.NET461。Visual Studio 2019。最初是支持以下&#xff1…

【ELK集群+MQ】通用部署方案以及快速实现MQ发布订阅服务功能

前言&#xff1a;大概一年多前写过一个部署ELK系列的博客文章&#xff0c;前不久刚好在部署一个ELK的解决方案&#xff0c;我顺便就把一些基础的部分拎出来&#xff0c;再整合成一期文章。大概内容包括&#xff1a;搭建ELK集群&#xff0c;以及写一个简单的MQ服务。如果需要看一…

多语言报表的改动方法

在定义上传RTF模板的时候&#xff0c;会有一个是否可翻译的选项&#xff0c;选择之后。就能够上传xlf文件作为翻译内容。 对于已经存在的多语言类型报表&#xff0c;稍作改动之后再上传&#xff0c;可能会出现下面现象&#xff1a; 进程出现了“未完毕”的提示 想要改动非常eas…

LightOJ - 1027 A Dangerous Maze —— 期望

题目链接&#xff1a;https://vjudge.net/problem/LightOJ-1027 1027 - A Dangerous MazePDF (English)StatisticsForumTime Limit: 2 second(s)Memory Limit: 32 MBYou are in a maze; seeing n doors in front of you in beginning. You can choose any door you like. The p…

MASA MAUI Plugin (六)集成个推,实现本地消息推送[Android] 篇

背景MAUI的出现&#xff0c;赋予了广大.Net开发者开发多平台应用的能力&#xff0c;MAUI 是Xamarin.Forms演变而来&#xff0c;但是相比Xamarin性能更好&#xff0c;可扩展性更强&#xff0c;结构更简单。但是MAUI对于平台相关的实现并不完整。所以MASA团队开展了一个实验性项目…

微软加更.NET7中文手册,都有哪些新亮点?

11月8号发布了.NET7&#xff0c;从底层性能改进&#xff0c;到上层API升级&#xff0c;让.NET7综合性能再度提升&#xff01;同时发布了最新的C#11&#xff0c;也带来了很多小惊喜。如何快捷学习最新的.NET7和C#11&#xff1f;答案只有一个&#xff0c;微软官方中文文档&#x…

.NET Conf China 2022 第一批讲师阵容大揭秘!整个期待了!

目光看过来2022年12月3-4日一场社区性质的国内规模最大的线上线下.NET Conf 2022技术大会即将盛大开幕目前大会正紧锣密鼓地进行中第一批大咖讲师及主题已确定小编迫不及待想和大家分享分享嘉宾很大咖分享内容很硬核戳戳小手期待ing孔令磊维宏股份 首席架构师 十多年数控领域研…

妙用SQL Server聚合函数和子查询迭代求和

先看看下面的表和其中的数据&#xff1a;t_product该表有两个字段&#xff1a;xh和price&#xff0c; 其中xh是主索引字段&#xff0c;现在要得到如下的查询结果&#xff1a;从上面的查询结果可以看出&#xff0c;totalprice字段值的规则是从第1条记录到当前记录的price之和。如…

记一次.NET某工控图片上传CPU爆高分析

一&#xff1a;背景 1.讲故事今天给大家带来一个入门级的 CPU 爆高案例&#xff0c;前段时间有位朋友找到我&#xff0c;说他的程序间歇性的 CPU 爆高&#xff0c;不知道是啥情况&#xff0c;让我帮忙看下&#xff0c;既然找到我&#xff0c;那就用 WinDbg 看一下。二&#xff…

从 WinDbg 角度理解 .NET7 的AOT玩法

一&#xff1a;背景 1.讲故事前几天 B 站上有位朋友让我从高级调试的角度来解读下 .NET7 新出来的 AOT&#xff0c;毕竟这东西是新的&#xff0c;所以这一篇我就简单摸索一下。二&#xff1a;AOT 的几个问题 1. 如何在 .NET7 中开启 AOT 功能在 .NET7 中开启 AOT 非常方便&…

【PPT】适配器模式 和 桥接模式

【PPT】适配器模式 和 桥接模式目录【PPT】适配器模式 和 桥接模式一、PPT 截图1.0、封面和目录1.1、设计模式概述1.2、结构型模式特点1.3、适配器模式1.4、桥接模式二、参考资料及 PPT 获取方法独立观察员 2022 年 11 月 15 日为之前公司准备的分享PPT&#xff0c;后来没用上。…

Flask 【第七篇】Flask中的wtforms使用

一、简单介绍flask中的wtforms WTForms是一个支持多个web框架的form组件&#xff0c;主要用于对用户请求数据进行验证。 安装&#xff1a; pip3 install wtforms 二、简单使用wtforms组件 1、用户登录 具体代码&#xff1a; from flask import Flask,render_template,request,…

为了避免内存攻击,美国国家安全局提倡Rust、C#、Go、Java、Ruby 和 Swift,但将 C 和 C++ 置于一边...

本文翻译自两篇文章&#xff0c;第一篇是对美国国家安全局在“软件内存安全”网络安全信息表的解读&#xff0c;第二篇是普及什么是内存安全&#xff0c;为什么它很重要&#xff1f;第一篇 为了避免内存攻击&#xff0c;美国国家安全局提倡Rust、C#、Go、Java、Ruby 和 Swift&a…

.NET周报【11月第2期 2022-11-15】

国内文章统一的开发平台.NET 7正式发布https://www.cnblogs.com/shanyou/archive/2022/11/09/16871945.html在 2020 年规划的.NET 5功能终于在.NET 7 完成了&#xff0c;为微软和社区一起为多年来将不同的开发产品统一起来的努力加冕&#xff0c;未来只有一个.NET, 回顾.NET 20…

chrome 悬停大图插件_Google Chrome浏览器的悬停卡:我不想要的我最喜欢的新东西

chrome 悬停大图插件If you only have a handful of open tabs in Google Chrome, it’s easy to tell what they are. But as you start to collect more tabs (or make the window smaller), it gets harder. That’s where Hover Cards come in. 如果您在Google Chrome浏览器…