关于ASP.NET Core WebSocket实现集群的思考

前言

    提到WebSocket相信大家都听说过,它的初衷是为了解决客户端浏览器与服务端进行双向通信,是在单个TCP连接上进行全双工通讯的协议。在没有WebSocket之前只能通过浏览器到服务端的请求应答模式比如轮询,来实现服务端的变更响应到客户端,现在服务端也可以主动发送数据到客户端浏览器。WebSocket协议和Http协议平行,都属于TCP/IP四层模型中的第四层应用层。由于WebSocket握手阶段采用HTTP协议,所以也需要进行跨域处理。它的协议标识是wswss对应了常规标识和安全通信协议标识。本文重点并不是介绍WebSocket协议相关,而是提供一种基于ASP.NET Core原生WebSocket的方式实现集群的实现思路。关于这套思路其实很早之前我就构思过了,只是之前一直没有系统的整理出来,本篇文章就来和大家分享一下,由于主要是提供一种思路,所以涉及到具体细节或者业务相关的可能没有体现出来,还望大家理解。

实现

咱们的重点关键字就是两个WebSocket集群,实现的框架便是基于ASP.NET Core,我也基于golang实现了一套,本文涉及到的相关源码和golang版本的实现都已上传至我的github[1],具体仓库地址可以转到文末自行跳转到示例源码[2]中查看。既然涉及到集群,这里咱们就用nginx作为反向代理,来搭建一个集群实例。大致的示例结构如下图所示

980d0a5c81d7e9969efaee70e43b2b6d.png

redis在这里扮演的角色呢,是用来处理Server端的消息相互传递用的,主要是使用的redis的pub/sub功能来实现的,这里便涉及到几个核心问题

  • • 首先,集群状态每个用户被分发到具体的哪台服务器上是不得而知的

  • • 其次,处在不同Server端的不同用户间的相互通信是需要一个传递媒介

  • • 最后,针对不同的场景比如单发消息、分组消息、全部通知等要有不同的处理策略

这里需要考虑的是,如果需要搭建实时通信服务器的话,需要注意集群的隔离性,主要是和核心业务进行隔离,毕竟WebSocket需要保持长链接、且消息的大小需要评估。

上面提到了redis的主要功能就是用来传递消息用的,毕竟每个server服务器是无状态的。这当然不是必须的,任何可以进行消息分发的中间件都可以,比如消息队列rabbitmq、kafka、rocketmq、mqtt等,甚至只要能把要处理的消息存储起来都可以比如缓存甚至是关系型数据库等等。这压力使用redis主要是因为操作起来简单、轻量级、灵活,让大家关注点在思路上,而不是使用中案件的代码上。

nginx配置

通过上面的图我们可以看到,我们这里构建集群示例使用的nginx,如果让nginx支持WebSocket的话,需要额外的配置,这个在网上有很多相关的文章介绍,这里就来列一下咱们示例的nginx配置,在配置文件nginx.conf

//上游服务器地址也就是websocket服务的真实地址
upstream wsbackend {server 127.0.0.1:5001;server 127.0.0.1:5678;
}server {listen       5000;server_name  localhost;location ~/chat/{//upstream地址proxy_pass http://wsbackend;proxy_connect_timeout 60s; proxy_read_timeout 3600s;proxy_send_timeout 3600s;//记得转发避免踩坑proxy_set_header Host $host;proxy_http_version 1.1; //http升级成websocket协议的头标识proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "Upgrade";}
}

这套配置呢,在搜索引擎上能收到很多,不过不妨碍我把使用的粘贴出来。这一套亲测有效,也是我使用的配置,请放心使用。个人认为如果是线上环境采用的负载均衡策略可以选择ip_hash的方式,保证同一个ip的客户端用户可以分发到一台WebSocket实例中去,这样的话能尽量避免使用redis的用户频道做消息传递。好了,接下来准备开始展示具体实现的代码了。

一对一发送

首先介绍的就是一对一发送的情况,也就是我把消息发给你,聊天的时候私聊的情况。这里呢涉及到两种情况

  • • 如果你需要通信的客户端和你连接在一个Server端里,这样的话可以直接在链接里找到这个端的通信实例直接发送。

  • • 如果你需要通信的客户端和你不在一个Server端里,这个时候咱们就需要借助redis的pub/sub的功能,把消息传递给另一个Server端。

咱们通过一张图大致的展示一下它的工作方式

40018a44e7c3c09ffc4ea8e21dd726e2.png

解释一下,每个客户端注册到WebSocket服务里的时候会在redis里订阅一个user:用户唯一标识的频道,这个频道用于接收和当前WebSocket连接不在一个服务端的其他WebSocket发送过来的消息。每次发送消息的时候你会知道你要发送给谁,不在当前服务器的话则发送到redis的user:用户唯一标识频道,这样的话目标WebSocket就能收到消息了。首先是注入相关的依赖项,这里我使用的redis客户端是freeredis,主要是因为操作起来简单,具体实现代码如下

var builder = WebApplication.CreateBuilder(args);
//注册freeredis
builder.Services.AddSingleton(provider => {var logger = provider.GetService<ILogger<WebSocketChannelHandler>>();RedisClient cli = new RedisClient("127.0.0.1:6379");cli.Notice += (s, e) => logger?.LogInformation(e.Log);return cli;
});
//注册WebSocket具体操作的类
builder.Services.AddSingleton<WebSocketHandler>();
builder.Services.AddControllers();var app = builder.Build();var webSocketOptions = new WebSocketOptions
{KeepAliveInterval = TimeSpan.FromMinutes(2)
};
//注册WebSocket中间件
app.UseWebSockets(webSocketOptions);app.MapGet("/", () => "Hello World!");
app.MapControllers();app.Run();

接下来我们定义一个Controller用来处理WebSocket请求

public class WebSocketController : ControllerBase
{private readonly ILogger<WebSocketController> _logger;private readonly WebSocketHandler _socketHandler;public WebSocketController(ILogger<WebSocketController> logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler){_logger = logger;_socketHandler = socketHandler;}//这里的id代表当前连接的客户端唯一标识比如用户唯一标识[HttpGet("/chat/user/{id}")]public async Task ChatUser(string id){//判断是否是WebSocket请求if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//处理请求相关await _socketHandler.Handle(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}
}

这里的WebSocketHandler是用来处理具体逻辑用的,咱们看一下相关代码

public class WebSocketHandler:IDisposable
{//存储当前服务用户的集合private readonly UserConnection UserConnection = new();//redis频道前缀private readonly string userPrefix = "user:";//用户对应的redis频道private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly ILogger<WebSocketHandler> _logger;//redis客户端private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task Handle(string id, WebSocket webSocket){//把当前用户连接存储起来_ = UserConnection.GetOrAdd(id, webSocket);//订阅一个当前用户的频道await SubMsg($"{userPrefix}{id}");var buffer = new byte[1024 * 4];//接收发送过来的消息,这个方法是阻塞的,如果没收到消息则一直阻塞var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//循环接收消息while (webSocket.State == WebSocketState.Open){try{//因为缓冲区长度是固定的所以要获取实际长度string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');//接收的到消息转换成实体MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(msg);//发送到其他客户端的数据byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标客户端是否在当前当前服务,如果在当前服务直接扎到目标连接直接发送if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果要发送的目标端不在当前服务,则发送给目标redis端的频道ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };//目标的redis频道_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}//继续阻塞循环接收消息receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//循环结束意味着当前端已经退出//从当前用户的集合移除当前用户_ = UserConnection.TryRemove(id, out _);//关闭当前WebSocket连接await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在当前订阅集合移除当前用户_disposables.TryRemove($"{userPrefix}{id}", out var disposable);//关闭当前用户的通道disposable.Dispose();}private async Task SubMsg(string channel){//订阅当前用户频道var sub = _redisClient.Subscribe(channel,  async (channel, data) => {//接收过来当前频道数据,说明发送端不在当前服务ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");//在当前服务找到目标的WebSocket连接并发送消息if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把redis订阅频道添加到集合中_disposables.TryAdd(channel, sub);}//程序退出的时候取消当前服务订阅的redis频道public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}
}

这里涉及到几个辅助相关的类,其中UserConnection类是存储注册到当前服务的连接,MsgBody类用来接受客户端发送过来的消息,ChannelMsgBody是用来发送redis频道的相关消息,因为要把相关消息通过redis发布出去,咱们列一下这几个类的相关代码

//注册到当前服务的连接
public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>>
{//存储用户唯一标识和WebSocket的对应关系private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>();//当前服务的用户数量public int Count => _users.Count;public WebSocket GetOrAdd(string userId, WebSocket webSocket){return _users.GetOrAdd(userId, webSocket);}public bool TryGetValue(string userId, out WebSocket webSocket){return _users.TryGetValue(userId, out webSocket);}public bool TryRemove(string userId, out WebSocket webSocket){return _users.TryRemove(userId, out webSocket);}public void Clear(){_users.Clear();}public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator(){return _users.GetEnumerator();}IEnumerator IEnumerable.GetEnumerator(){return this.GetEnumerator();}
}//客户端消息
public class MsgBody
{//目标用户标识public string Id { get; set; }//要发送的消息public string Msg { get; set; }
}//频道订阅消息
public class ChannelMsgBody
{//用户标识public string FromId { get; set; }//目标用户标识,也就是要发送给谁public string ToId { get; set; }//要发送的消息public string Msg { get; set; }
}

这样的话关于一对一发送消息的相关逻辑就实现完成了,启动两个Server端,由于nginx默认的负载均衡策略是轮询,所以注册两个用户的话会被分发到不同的服务里去

9ea4d34f88c6d8c0e3b187baea1a9624.pngadd699c29c6c7017f63d2a3c75b6f6f6.png

Postman连接三个连接唯一标识分别是1、2、3,模拟一下消息发送,效果如下,发送效果

5179738c091932c89b25c2aaa5d07361.png

接收效果

2b915faad16a5d094c2385b3c5fb9d36.png

群组发送

上面我们展示了一对一发送的情况,接下来我们来看一下,群组发送的情况。群组发送的话就是只要大家都加入一个群组,只要客户端在群组里发送一条消息,则注册到当前群组内的所有客户端都可以收到消息。相对于一对一的情况就是如果当前WebSocket服务端如果存在用户加入某个群组,则当前当前WebSocket服务端则可以订阅一个group:群组唯一标识的redis频道,集群中的其他WebSocket服务器通过这个redis频道接收群组消息,通过一张图描述一下

3fa40aa436d8f3a05fa637a35ac54038.png

群组的实现方式相对于一对一要简单一点

  • • 发送端可以不用考虑当前服务中的客户端连接,一股脑的交给redis把消息发布出去

  • • 如果有WebSocket服务中的用户订阅了当前分组则可以接受消息,获取组内的用户循环发送消息

展示一下代码实现的方式,首先是定义一个action用于表示群组的相关场景

//包含两个标识一个是组别标识一个是注册到组别的用户
[HttpGet("/chat/group/{groupId}/{userId}")]
public async Task ChatGroup(string groupId, string userId)
{if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//调用HandleGroup处理群组相关的消息await _socketHandler.HandleGroup(groupId, userId, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}
}

接下来看一下HandleGroup的相关逻辑,还是在WebSocketHandler类中,看一下代码实现

public class WebSocketHandler:IDisposable
{private readonly UserConnection UserConnection = new();private readonly GroupUser GroupUser = new();private readonly SemaphoreSlim _lock = new(1, 1);private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly string groupPrefix = "group:";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleGroup(string groupId, string userId, WebSocket webSocket){//因为群组的集合可能会存在很多用户一起访问所以限制访问数量await _lock.WaitAsync();//初始化群组容器 群唯一标识为key 群员容器为valuevar currentGroup = GroupUser.Groups.GetOrAdd(groupId, new UserConnection { });//当前用户加入当前群组_ = currentGroup.GetOrAdd(userId, webSocket);//只有有当前WebSocket服务的第一个加入当前组的时候才去订阅群组频道//如果不限制的话则会出现如果当前WebSocket服务有多个用户在一个组内则会重复收到redis消息if (currentGroup.Count == 1){//订阅redis频道await SubGroupMsg($"{groupPrefix}{groupId}");}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收WebSocket消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//服务不退出的话则一直等待接收while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msg}");//组装redis频道发布的消息,目标为群组标识ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msg };//通过redis发布消息_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//如果客户端退出则在当前群组集合删除当前用户_ = currentGroup.TryRemove(userId, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubGroupMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) => {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//在当前WebSocket服务器找到当前群组里的用户GroupUser.Groups.TryGetValue(msgBody.ToId, out var currentGroup);//循环当前WebSocket服务器里的用户发送消息foreach (var user in currentGroup){//不用给自己发送了if (user.Key == msgBody.FromId){continue;}if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把当前频道加入订阅集合_disposables.TryAdd(channel, sub);}
}

这里涉及到了GroupUser类,是来存储群组和群组用户的对应关系的,定义如下

public class GroupUser
{//key为群组的唯一标识public ConcurrentDictionary<string, UserConnection> Groups = new ConcurrentDictionary<string, UserConnection>();
}

演示一下把两个用户添加到一个群组内,然后发送接收消息的场景,用户u1发送

1218ff21694a66c60a835925a98fca7c.png

用户u2接收

ecf82958a6be8fcf28809ab7373ec8fb.png

发送所有人

发送给所有用户的逻辑比较简单,不用考虑到用户限制,只要用户连接到了WebSocket集群则都可以接收到这个消息,大致工作方式如下图所示

5c07ac71c55eea1b4190b5e39753aee6.png

这个比较简单,咱们直接看实现代码,首先是定义一个地址,用于发布消息

//把用户注册进去
[HttpGet("/chat/all/{id}")]
public async Task ChatAll(string id)
{if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _socketHandler.HandleAll(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}
}

具体的实现逻辑还是在HandleGroup类里,是HandleAll方法,看一下具体实现

public class WebSocketHandler:IDisposable
{private readonly UserConnection AllConnection = new();private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly string all = "all";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleAll(string id, WebSocket webSocket){await _lock.WaitAsync();//把用户加入用户集合_ = AllConnection.GetOrAdd(id, webSocket);//WebSocket集群中的每个服务只定义一次if (AllConnection.Count == 1){await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收信息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');_logger.LogInformation($"user {id} send:{msg}");//获取接收信息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msg };//把消息通过redis发布到集群中的其他服务_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//用户退出则删除集合中的当前用户信息_ = AllConnection.TryRemove(id, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubAllMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) => {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//接收到消息后遍历用户集合把消息发送给所有用户foreach (var user in AllConnection){   //如果包含当前用户跳过if (user.Key == msgBody.FromId){continue;}if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});_disposables.TryAdd(channel, sub);}
}

效果在这里就不展示了,和群组的效果是类似的,只是一个是部分用户,一个是全部的用户。

整合到一起

上面我们分别展示了一对一、群组、所有人的场景,但是实际使用的时候,每个用户只需要注册到WebSocket集群一次也就是保持一个连接即可,而不是一对一一个连接、注册群组一个连接、所有消息的时候一个连接。所以我们需要把上面的演示整合一下,一个用户只需要连接到WebSocket集群一次即可,至于发送给谁,加入什么群组,接收全部消息等都是连接后通过一些标识区分的,而不必每个类型的操作都注册一次,就和微信和QQ一样我只要登录了即可,至于其他操作都是靠数据标识区分的。接下来咱们就整合一下代码达到这个效果,大致的思路是

  • • 用户连接到WebSocket集群,把用户和连接保存到当前WebSocket服务器的用户集合中去。

  • • 一对一发送的时候,只需要在具体的服务器中找到具体的客户端发送消息

  • • 群组的时候,先把当前用户标识加入群组集合即可,接收消息的时候根据群组集合里的用户标识去用户集合里去拿具体的WebSocket连接发送消息

  • • 全员消息的时候,直接遍历集群中的每个WebSocket服务里的用户集合里的WebSocket连接训话发送消息

这样的话就保证了每个客户端用户在集群中只会绑定一个连接,首先还是单独定义一个action,用于让客户端用户连接上来,具体实现代码如下所示

public class WebSocketChannelController : ControllerBase
{private readonly ILogger<WebSocketController> _logger;private readonly WebSocketChannelHandler _webSocketChannelHandler;public WebSocketChannelController(ILogger<WebSocketController> logger, WebSocketChannelHandler webSocketChannelHandler){_logger = logger;_webSocketChannelHandler = webSocketChannelHandler;}//只需要把当前用户连接到服务即可[HttpGet("/chat/channel/{id}")]public async Task Channel(string id){if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _webSocketChannelHandler.HandleChannel(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}
}

接下来看一下WebSocketChannelHandler类的HandleChannel方法实现,用于处理不同的消息,比如一对一、群组、全员消息等不同类型的消息

public class WebSocketChannelHandler : IDisposable
{//用于存储当前WebSocket服务器链接上来的所有用户对应关系private readonly UserConnection UserConnection = new();//用于存储群组和用户关系,用户集合采用HashSet保证每个用户只加入一个群组一次private readonly ConcurrentDictionary<string, HashSet<string>> GroupUser = new ConcurrentDictionary<string, HashSet<string>>();private readonly SemaphoreSlim _lock = new(1, 1);//存放redis订阅实例private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();//一对一redis频道前缀private readonly string userPrefix = "user:";//群组redis频道前缀private readonly string groupPrefix = "group:";//全员redis频道private readonly string all = "all";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketChannelHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleChannel(string id, WebSocket webSocket){await _lock.WaitAsync();//每次连接进来就添加到用户集合_ = UserConnection.GetOrAdd(id, webSocket);//每个WebSocket服务实例只需要订阅一次全员消息频道await SubMsg($"{userPrefix}{id}");if (UserConnection.Count == 1){await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//接收客户端消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');//读取客户端消息ChannelData channelData = JsonConvert.DeserializeObject<ChannelData>(msg);//判断消息类型switch (channelData.Method){//一对一case "One":await HandleOne(id, channelData.MsgBody, receiveResult);break;//把用户加入群组case "UserGroup":await AddUserGroup(id, channelData.Group, webSocket);break;//处理群组消息case "Group":await HandleGroup(channelData.Group, id, webSocket, channelData.MsgBody);break;//处理全员消息default:await HandleAll(id, channelData.MsgBody);break;}receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在群组中移除当前用户foreach (var users in GroupUser.Values){lock (users){users.Remove(id);}}//当前客户端用户退出则移除连接_ = UserConnection.TryRemove(id, out _);//取消用户频道订阅_disposables.Remove($"{userPrefix}{id}", out var sub);sub?.Dispose();}public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}
}

这里涉及到了ChannelData类是用于接收客户端消息的类模板,具体定义如下

public class ChannelData
{//消息类型 比如一对一 群组 全员public string Method { get; set; }//群组标识public string Group { get; set; }//消息体public object MsgBody { get; set; }
}

类中并不会包含当前用户信息,因为连接到当前服务的时候已经提供了客户端唯一标识。结合上面的处理代码我们可以看出,客户端用户连接到WebSocket实例之后,先注册当前用户的redis订阅频道并且当前实例仅注册一次全员消息的redis频道,用于处理非当前实例注册客户端的一对一消息处理和全员消息处理,然后等待接收客户端消息,根据客户端消息的消息类型来判断是进行一对一、群组、或者全员的消息类型处理,它的工作流程入下图所示

1abede83115fd260a7d42ac4de48f4cf.png

由代码和上面的流程图可知,它根据不同的标识去处理不同类型的消息,接下来我们可以看下每种消息类型的处理方式。

一对一处理

首先是一对一的消息处理情况,看一下具体的处理逻辑,首先是一对一发布消息

private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult){MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg));byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标用户是否在当前WebSocket服务器if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果不在当前服务器,则直接把消息发布到具体的用户频道去,由具体用户去订阅ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}
}

接下来是用于处理订阅其他用户发送过来消息的逻辑,这个和整合之前的逻辑是一致的,在当前服务器中找到用户对应的连接,发送消息

private async Task SubMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(msgBody.FromId, out _);}}});//把订阅实例加入集合_disposables.TryAdd(channel, sub);
}

如果给某个用户发送消息则可以使用如下的消息格式

{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}

Method为One代表着是私聊一对一的情况,消息体内Id为要发送给的具体用户标识和消息体。

群组处理

接下来看群组处理方式,这个和之前的逻辑是有出入的,首先是用户要先加入到某个群组然后才能接收群组消息或者在群组中发送消息,之前是一个用户对应多个连接,整合了之后集群中每个用户只关联唯一的一个WebSocket连接,首先看用户加入群组的逻辑

private async Task AddUserGroup(string user, string group, WebSocket webSocket)
{//获取群组信息var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>());lock (currentGroup){//把用户标识加入当前组_ = currentGroup.Add(user);}//每个组的redis频道,在每台WebSocket服务器实例只注册一次订阅if (currentGroup.Count == 1){//订阅当前组消息await SubGroupMsg($"{groupPrefix}{group}");}string addMsg = $"user 【{user}】 add  to group 【{group}】";byte[] sendByte = Encoding.UTF8.GetBytes(addMsg);await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);//如果有用户加入群组,则通知其他群成员ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg };_redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody));
}

用户想要在群组内发消息,则必须先加入到一个具体的群组内,具体的加入群组的格式如下

{"Method":"UserGroup", "Group":"g1"}

Method为UserGroup代表着用户加入群组的业务类型,Group代表着你要加入的群组唯一标识。接下来就看下,用户发送群组消息的逻辑了

private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody)
{//判断群组是否存在var hasValue = GroupUser.TryGetValue(groupId, out var users);if (!hasValue){byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}//只有加入到当前群组,才能在群组内发送消息if (!users.Contains(userId)){byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}");//发送群组消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() };_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));
}

加入群组之后则可以发送和接收群组内的消息了,给群组发送消息的格式如下

{"Method":"Group", "Group":"g1", "MsgBody":"Hi All"}

Method为Group代表着用户加入群组的业务类型,Group则代表你要发送到具体的群组的唯一标识,MsgBody则是发送到群组内的消息。最后再来看下订阅群组内消息的情况,也就是处理群组消息的逻辑

private async Task SubGroupMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{//接收群组订阅消息ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//获取当前服务器实例中当前群组的所有用户连接GroupUser.TryGetValue(msgBody.ToId, out var currentGroup);foreach (var user in currentGroup){if (user == msgBody.FromId){continue;}//通过群组内的用户标识去用户集合获取用户集合里的用户唯一连接发送消息if (UserConnection.TryGetValue(user, out var targetSocket) && targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{currentGroup.Remove(user);}}});_disposables.TryAdd(channel, sub);
}

全员消息处理

全员消息处理相对来说思路比较简单,因为当服务启动的时候就会监听redis的全员消息频道,这样的话具体的实现也就只包含发送和接收全员消息了,首先看一下全员消息发送的逻辑

private async Task HandleAll(string id, object msgBody)
{_logger.LogInformation($"user {id} send:{msgBody}");//直接给redis的全员频道发送消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msgBody.ToString() };_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));
}

全员消息的发送数据格式如下所示

{"Method":"All", "MsgBody":"Hello All"}

Method为All代表着全员消息类型,MsgBody则代表着具体消息。接收消息出里同样很简单,订阅redis全员消息频道,然后遍历当前WebSocket服务器实例内的所有用户获取连接发送消息,具体逻辑如下

private async Task SubAllMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//获取当前服务器实例内所有用户的连接foreach (var user in UserConnection){//不给自己发送消息,因为发送的时候可以通过具体的业务代码处理if (user.Key == msgBody.FromId){continue;}//给每个用户发送消息if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(user.Key, out _);}}});_disposables.TryAdd(channel, sub);
}

示例源码

由于篇幅有限,没办法设计到全部的相关源码,因此在这里贴出来github相关的地址,方便大家查看和运行源码。相关的源码我这里实现了两个版本,一个是基于asp.net core的版本,一个是基于golang的版本。两份源码的实现思路是一致的,所以这两份代码可以运行在一套集群示例里,配置在一套nginx里,并且连接到同一个redis实例里即可

  • • asp.net core源码示例 WebsocketCluster[3]

  • • golang源码示例 websocket-cluster[4]

仓库里还涉及到本人闲暇之余开源的其他仓库,由于本人能力有限难登大雅之堂,就不做广告了,有兴趣的同学可以自行浏览一下。

总结

    本文基于ASP.NET Core框架提供了一个基于WebSocket做集群的示例,由于思想是通用的,所以基于这个思路楼主也实现了golang版本。其实在之前就想自己动手搞一搞关于WebSocket集群方面的设计,本篇文章算是对之前想法的一个落地操作。其核心思路文章已经做了相关介绍,由于这些只是博主关于构思的实现,可能有很多细节尚未体现到,还希望大家多多理解。其核心思路总结一下

  • • 首先是,利用可以构建WebSocket服务的框架,在当前服务实例中保存当前客户端用户和WebSocket的连接关系

  • • 如果消息的目标客户端不在当前服务器,可以利用redis频道、消息队列相关、甚至是数据库类的共享回话发送的消息,由目标服务器获取目标是否属于自己的ws会话

  • • 本文设计的思路使用的是无状态的方式,即WebSocket服务实例之间不存在直接的消息通信和相互的服务地址存储,当然也可以利用redis等存储在线用户信息等,这个可以参考具体业务自行设计

读万卷书,行万里路。在这个时刻都在变化点的环境里,唯有不断的进化自己,多接触多尝试不用的事物,多扩展自己的认知思维,方能构建自己的底层逻辑。毕竟越底层越抽象,越通用越抽象。面对未知的挑战,自身作为自己坚强的后盾,可能才会让自己更踏实。

引用链接

[1] 我的github: https://github.com/softlgl
[2] 示例源码: #示例源码
[3] WebsocketCluster: https://github.com/softlgl/WebsocketCluster
[4] websocket-cluster: https://github.com/softlgl/websocket-cluster

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/281250.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

windows环境下Apache+PHP+MySQL搭建服务器

相关文件下载 下载地址Apachehttps://www.apachehaus.com/cgi-bin/download.plxPHPhttps://windows.php.net/downloadMySQLhttps://dev.mysql.com/downloads/mysql/MySQL MySQL配置 当前使用的MySQL版本是8.0.18&#xff0c;在MySQL根目录下新建my.ini文件&#xff0c;下面是…

angular.js国际化模块

最近需要将一个项目转化成英文的&#xff0c; 于是发现一个angular模块angular-translate&#xff0c;实现如下&#xff1a; 1.安装包 bower install angular-translate bower install angular-translate-loader-static-files //然后在页面引用进去 <script src"/angul…

触屏网站如何实现返回并刷新

目的 在会员中心等页面常常会遇到进入内页修改信息&#xff0c;返回前一个页面需要更新信息的场景。 思路 用COOKIE记录当前页面是否需要刷新&#xff0c;返回之后再刷新一次页面。 方案 下载js.cookie.js然后引入到项目中 https://github.com/js-cookie/js-cookie 先来一个最简…

更快,更强的.NET 7 发布了

.NET Conf 2022 在昨晚(11⽉8⽇) 11 点 正式开始了&#xff0c;为期三天的会议&#xff08;11⽉8-10⽇&#xff09;&#xff0c; 围绕 .NET 7 展开。相信各位⼩伙伴都已经开始安装 .NET 7 正式版本还有以及相关的开发⼯具。这次 .NET 7 围绕传统的 C# &#xff0c;ASP.NET Core…

Web服务器 - Nginx配置介绍

nginx的配置相对简单&#xff0c;总体来说分为5种模块 全局块&#xff1a;配置影响nginx全局的指令。一般有运行nginx服务器的用户组&#xff0c;nginx进程pid存放路径&#xff0c;日志存放路径&#xff0c;配置文件引入&#xff0c;允许生成worker process数等。events块&…

jvm(Java virtual machine) JVM架构解释

2019独角兽企业重金招聘Python工程师标准>>> JVM 架构解释 每个Java开发者都知道通过JRE【Java运行环境】执行字节码。 但是很多人都不知道JRE是JVM实现的事实。JVM负责执行字节码的分析 代码的解释和运行。 我们应该了解JVM的架构&#xff0c;这对开发者来说是很重…

Hyper-V 嵌套虚拟化

先决条件运行 Windows Server 2016 或Windows 10 周年更新的 Hyper-V 主机。运行 Windows Server 2016 或Windows 10 周年更新的 Hyper-V VM。配置版本为 8.0 或更高的 Hyper-V VM。采用 VT-x 和 EPT 技术的 Intel 处理器&#xff08;AMD-V技术的暂时不支持&#xff09;>Set…

简单的面试题简解思路(搜集)

1. 统计字符串中单词出现次数 "hi how are you i am fine thank you youtube am am "&#xff0c;统计"you"出现的次数。 方法一 : split() function wordCount(str,word){var str str || "";var word word || "";var strArr s…

WinForm(十五)窗体间通信

在很多WinForm的程序中&#xff0c;会有客户端之间相互通信的需求&#xff0c;或服务端与客户端通信的需求&#xff0c;这时就要用到TCP/IP的功能。在.NET中&#xff0c;主要是通过Socket来完成的&#xff0c;下面的例子是通过一个TcpListerner作为监听&#xff0c;等待TcpClie…

905. 按奇偶排序数组

1// 905. 按奇偶排序数组 2/** 3 * param {number[]} A 4 * return {number[]} 5 */ 6var sortArrayByParity function(A) { 7 return A.filter(value > value % 2 0).concat( 8 A.filter(value > value % 2 1) 9 )10}; 转载于:https://www.cnblogs.com/…

关于Java开发需要注意的十二点流程

1.将一些需要变动的配置写在属性文件中 比如&#xff0c;没有把一些需要并发执行时使用的线程数设置成可在属性文件中配置。那么你的程序无论在DEV环境中&#xff0c;还是TEST环境中&#xff0c;都可以顺畅无阻地运行&#xff0c;但是一旦部署在PROD上&#xff0c;把它作为多线…

Unity经典游戏教程之:雪人兄弟

版权声明&#xff1a; 本文原创发布于博客园"优梦创客"的博客空间&#xff08;网址&#xff1a;http://www.cnblogs.com/raymondking123/&#xff09;以及微信公众号"优梦创客"&#xff08;微信号&#xff1a;unitymaker&#xff09;您可以自由转载&#x…

使用webpack搭建个性化项目

安装主包 yarn add webpack webpack-cli webpack-dev-server -D根据项目实际需求安装loaders&#xff0c;webpack-loaders列表 根据项目实际需求安装插件&#xff0c; webpack-plugins列表 常用包列表 包名说明webpackwebpack主程序&#xff0c;配置列表webpack-cliwebpack…

.NET周报【11月第1期 2022-11-07】

国内文章开源安全赋能 - .NET Conf China 2022https://mp.weixin.qq.com/s/_tYpfPeQgyEGsnR4vVLzHg.NET Conf China 2022 是面向开发人员的社区峰会&#xff0c;延续 .NET Conf 2022 的活动&#xff0c;庆祝 .NET 7 的发布和回顾过去一年来 .NET 在中国的发展成果&#xff0c;它…

React - 状态提升

从入门的角度来聊一下React 的状态提升。我们先来看一下React官网是怎么介绍这一概念的&#xff1a;使用 react 经常会遇到几个组件需要共用状态数据的情况。这种情况下&#xff0c;我们最好将这部分共享的状态提升至他们最近的父组件当中进行管理。很简单的一句介绍&#xff0…

saltstack(三) --- salt-httpapi

以下操作均在master上操作 1. 安装api netapi modules&#xff08;httpapi&#xff09;有三种&#xff0c;分别是rest_cherrypy、rest_tornado、rest_wsig&#xff0c;接下来要讲的是rest_cherrypydoc&#xff1a;https://docs.saltstack.com/en/latest/ref/netapi/all/salt.ne…

c++实现二叉搜索树

自己实现了一下二叉搜索树的数据结构。记录一下&#xff1a; #include <iostream>using namespace std;struct TreeNode{int val;TreeNode *left;TreeNode *right;TreeNode(int value) { valvalue; leftNULL; rightNULL; } };class SearchTree{public:SearchTree();~Sear…

一款自用的翻译小工具,开源了

一款自用的翻译小工具&#xff0c;开源了TranslationTool作者&#xff1a;WPFDevelopersOrg - 唐宋元明清|驚鏵原文链接&#xff1a;https://github.com/Kybs0/TranslationTool此项目使用WPF MVVM开发。框架使用大于等于.NET461。Visual Studio 2019。最初是支持以下&#xff1…

JS使用按位异或方式加密字符串

按位异或加密字符串&#xff0c;字符串加解密都是该函数 缺陷是加密密钥使用的字符最好不要出现需要加密的字符串中的字符&#xff0c;一旦出现原字符与加密字符一样额情况&#xff0c;异或结果为0&#xff0c;导致不能还原字符串&#xff0c;可以考虑更改算法避免这种情况 im…

SCSS 实用知识汇总

1、变量声明 $nav-color: #F90; nav {//$width 变量的作用域仅限于{}内$width: 100px;width: $width;color: $nav-color; }.a {//报错&#xff0c;$width未定义width: $width; } 2、父选择器& scss代码&#xff1a; article a {color: blue;&:hover { color: red } } 编…