前言
PasteCluster是由.NET6.0编写的集群中间件,先已开源:
PasteCluster.Gitee
在实际开发中,如果一个服务(比如api)是否支持集群部署,其实是由开发决定的!
举个栗子
我们知道缓存,可以分几种方式,最简单的就是应用缓存,也就是我们开发中使用的变量,比如全局变量,或者使用外部程序缓存比如redis,也可以使用数据库作为缓存等!
图形验证码很熟悉吧,第一步是缓存下这个客户的code是多少,然后把Image显示给客户,让客户输入,客户输入后调用注册或者其他Action进行提交,这个时候就要校验用户提交的code是否和自己存储的一致!如果你的程序使用的是程序缓存,那么这个程序就不能集群部署了!(有人会说我使用Nginx的cookie锁定,这里不严格讨论这个事情哈!)
使用案例一则
拿PasteSpider来说,PasteSpider支持集群部署,支持一拖多服务器,那么问题就来了,如果集群部署,哪些服务器由哪个PasteSpider进行管理,这就需要有一个集群管理模式
1.Master是谁,是否要遵循他们说得单数模式!(我觉得单数模式是很奇怪的设定,类似无限套娃的假设模式了)
2.哪些服务器由哪个PasteSpider(这里作为节点)管理
3.一些公共任务的执行,比如小时报表的统计(如果每个节点都执行这个任务,要么有并发的问题,要么有数据切点的问题)
引入PasteCluster后,这些问题将被综合处理,PasteSpider只要处理自己的业务代码,集群里面的选举啥的由PasteCluster进行处理!
使用案例二则
由这么一个项目需求,需要统计在线人数
使用node作为websocket的服务端,支持集群部署
一个node中可能有多个站点的用户
需求是实时知道某一个站点多少客户
1.客服客户端每秒去遍历所有的node,查询各站点的在线链接数,这个虽然可以实现需求,但是会造成极大的浪费,如果客服客户端多了会造成严重的数据浪费!
2.设定一个专门的node(na)作为master,其他的Node在链接数有变更的时候,告知这个node(na),那么问题来了,如果这个na挂了,那么会造成这个统计的中断,再者客服客户端要多一个链接到这个node(na),相比于1,明显的效率提升很多!
3.可能有人说,借助于redis的hashset的功能,++或者–数据,同样的需要一个主体去读取这个数据,然后群发给在线客服从而更新在线人数!这么执行的结果就是每个连接的变动,都需要读取hashset的列表,然后计算出结果,推送给所有的在线客服!(优秀)
4.或者我们可以换一个思想实现,node中选举一个master出来,其他节点的在线链接数变更后,把对应的数据推送给master,由master在程序内缓存中计算出站点在线人数信息,然后推送给所有客服!当这个master挂了后,由系统机制再次选举出一个master来负责这个事情!
以上的方案中,1或者2其实是不合理的,3和4,如果可以的话3还是很好的,4是在不借助外部的情况下实现的,3、4都有一个问题,那就是数据的重置!比如某一个节点引发了异常,然后重启了,那么他的数据就有一个断层了,不得不引发一个问题,上载数据的时候 是上载++ – 还是上载当前这个节点的各个站点的当前链接数!
如何使用
组件适用
1.需要集群部署的服务,需要有主从的集群服务
2.如果是定时的任务,建议使用PasteTask任务调度器实现
3.组件适用于.net6.0以上的框架
4.集群对外的消息使用Channel队列发出
引入组件
1.先引入IHttpClientFactory,示例:context.Services.AddHttpClient();
2.添加对组件的应用 示例:<PackageReference Include="PasteCluster" Version="1.0.0" />
3.使用单例注入,示例:context.Services.AddSingleton<PasteClusterHandler>();
4.配置配置文件,示例:context.Services.Configure<PasteSloveConfig>(Configuration.GetSection("ClusterConfig"));
"ClusterConfig": {"SloveToken": "zxcvfr43dr56hgt5",//集群密钥,可自定义,防止其他集群乱入!"ClusterHost": "",//已有的节点地址链路,示例http://192.110.0.3:80;http://192.110.0.7:80"CurrentHost": ""//当前节点的访问地址是多少 示例http://192.110.0.6:80}
以上是基础配置,需要自定义更多的请查阅PasteSloveConfig的属性5.如果使用了Route路由,注意本组件需要/api/cluster/的路由转发配置
开始使用
1.初始化集群组件,写入必要的信息,比如集群中还有谁,自己的HOST是多少
2.定义一个HostedService用于接收集群产生的事件
示例代码:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using PasteCluster;
using TestCluster.redismodel;namespace TestCluster
{/// <summary>/// 系统初始化/// </summary>public class StartHostedService : IHostedService{private readonly IServiceProvider _serviceProvider;private PasteClusterHandler _cluster_handler;private IAppCache _appCache;private ILogger<StartHostedService> _logger;/// <summary>/// /// </summary>/// <param name="serviceProvider"></param>/// <param name="pasteClusterHandler"></param>/// <param name="appCache"></param>/// <param name="logger"></param>public StartHostedService(IServiceProvider serviceProvider, PasteClusterHandler pasteClusterHandler, IAppCache appCache, ILogger<StartHostedService> logger){_serviceProvider = serviceProvider;_cluster_handler = pasteClusterHandler;_appCache = appCache;_logger = logger;}private const string node_list_cache_key = "cache:last:nodes";//private System.Timers.Timer _timer;/// <summary>/// /// </summary>/// <param name="cancellationToken"></param>/// <returns></returns>/// <exception cref="System.NotImplementedException"></exception>public Task StartAsync(CancellationToken cancellationToken){//可以从他处读取集群节点列表,然后把列表写入到系统中var reads = _appCache.Get<List<PasteNodeModel>>(node_list_cache_key);if (reads != null && reads.Count > 0){foreach (var _node in reads){_cluster_handler.AddNodeToList(_node).Wait();//await _cluster_handler.AddNodeToList(_node);}}//如果已知当前节点的Host(这里示例默认为80端口)信息,可以用下方的函数写入,也可以在启动的时候写入配置中// -e ClusterConfig:CurrentHost="http://192.168.1.100"//如果使用PasteSpider部署,则是-e ClusterConfig:CurrentHost="http://{{App.ProAddress}}"//_cluster_handler.Register("http://192.168.1.100",0,0);//启动集群中的当前节点(在这之前必须要确认当前节点的host是多少)_cluster_handler.StartCluster();//读取集群产生的数据,比如其他节点发送的数据等,这里一般是业务相关的消息ReadClusterChannel();//_timer = new System.Timers.Timer();//hit_index = new Random().Next(1,100);//_timer.Interval = 1000;//_timer.AutoReset = true;//_timer.Elapsed += _timer_Elapsed;//_timer.Start();return Task.CompletedTask;}//private int hit_index = 5;//测试产生数据进行交互//private void _timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)//{// try// {// hit_index--;// if (hit_index == 0)// {// _cluster_handler.PushMsgToNode($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");// hit_index = new Random().Next(1, 20);// }// }// catch (Exception exl)// {// _logger.LogException(exl);// }//}/// <summary>/// 处理集群产生的数据/// </summary>private async void ReadClusterChannel(){try{var _info = await _cluster_handler.ChannelCluster.Reader.ReadAsync();if (_info != null && _info != default){_logger.LogInformation("ClusterChannel.Message:" + Newtonsoft.Json.JsonConvert.SerializeObject(_info));if (_info.msg_type == 1){if (_cluster_handler.CurrentIsMaster){//当前节点确定选举为Master 可以执行保存等//var _nodes = _cluster_handler.Nodes();//if (_nodes != null && _nodes.Length > 0)//{// await _appCache.SetAsync<List<PasteNodeModel>>(node_list_cache_key, _nodes.ToList());//}}}else{//业务代码处理}}}catch (Exception exl){_logger.LogException(exl);await Task.Delay(1000);}finally{ReadClusterChannel();}}/// <summary>/// /// </summary>/// <param name="cancellationToken"></param>/// <returns></returns>/// <exception cref="System.NotImplementedException"></exception>public Task StopAsync(CancellationToken cancellationToken){_cluster_handler.Dispose();return Task.CompletedTask;}}
}
3.在入口函数中启动这个HostedService
``context.Services.AddHostedService<StartHostedService>();
``
相关问答
1.如何知道当前运行情况
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/status
2.获取所有节点信息
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/nodes
3.手动注入当前节点IP信息
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/joinin?host=http://192.168.1.5
4.如何查找当前节点?
有一种模式,就是知道所有集群的IP,需要找到A是A,那么有前提
a.先要把节点的信息写入到组件,所有的节点
b.给当前节点设定一个code,可以使用SetNodeCode
c.后者使用环境信息 -e ClusterConfig:CurrentCode="xxxxxuuuu323423"
d.StartCluster()后,组件会遍历节点,查找哪个节点是自己!主要他们的链接要能通!
5.如何防止多Master的情况出现
a.在每次交互的时候,会附带当前节点的信息,比如master是谁,有多少各节点等,通过这个信息进行判断各自节点的信息是否一致,不一致的时候触发信息同步!
b.master冲突的时候,会检查哪一个节点的选举时间更早,更早的为主,时间采用时间戳ms模式,如果时间戳一致,则再对比对应的Id信息
c.节点数据交互的时候,会进行master和count的消息确定
6.消息完整性?
a.消息分成2部分,一部分是组件内部消息,另一部分是涉及业务的部分
b.内部消息,其实在配置的时间周期内会重新发送重新检查,以便维护集群节点的正确性
c.业务消息,一般是节点发送消息给master,在发送失败的时候,会压入队列,等待选举完成后再解压队列发送,这里有疑问的是,业务消息是否也有可以丢弃的消息,比如如下场景:x.已知有N个node节点,作为在线客服的websocket的服务端,内部链接又使用site分隔用户xx.如果要获得某一个站点site有多少人在线,要么遍历所有的node节点,然后统计数据,要么所有节点有信息的时候告知某一个点,很显然在这个需求中,各node节点有在线数变更的时候主动通知是最合适的xxx.比如node1告知node1:site1:30,node2告知node2:site1:25 信息归总到master中,master中本地缓存这个数据,当有变动的时候,推送给所有客服节点即可!
xxxx.这里要处理的就是,某一个节点宕机的情况,比如node5不可用了,如何告知master他掉了!
逻辑介绍
1.Master的选举逻辑a.当前组件集群支持从1到N,这个数量理论上不是无上限的,要基于配置文件中的最长交互时间决定b.当当前没有Master的时候,会从节点队列中问询谁是master,被问询的如果也没有master信息,则被问询方直接成为master,并进入master的选举环节c.节点交互会重置最后交互时间,主要在于master和clusterd.组件会定期检查这个交互时间是否过期,如果过期了,会尝试健康检查,如果不通,则进入选举模式
2.选举原则a.谁先启动谁为主b.谁被告知消息不符,谁放弃,进行master查找
写在最后
1.当前组件的代码还没有经过严格测试,我估计还有问题的地方在于
a.时效性的问题,虽然可以修改检查的间隔时间,但是这个时间也是有间隙的
b.选举间隔,产生太多消息的话?消息重复的话!压缩的消息是否对业务有影响
c.如果同时多个节点掉线,那么这个选举的时间会更长
d.当前集群模式将在后续更新到PasteSpider的集群策略中,替换到目前的版本!