【PasteSpider】的集群组件【PasteCluster】(让你的项目快速支持集群模式)的思路及实现(含源码)

PasteSpider是什么?

一款使用.net编写的开源的Linux容器部署助手,支持一键发布,平滑升级,自动伸缩,Key-Value配置,项目网关,环境隔离,运行报表,差量升级,私有仓库,集群部署,版本管理等!30分钟上手,让开发也可以很容易的学会在linux上部署你得项目![从需求角度介绍PasteSpider(K8S平替部署工具适合于任何开发语言)](https://blog.csdn.net/Apeart/article/details/138819197?spm=1001.2014.3001.5501)

PasteCluster又是什么?


一套使用.net编写的中间件,如果你的API项目是.NET的,那么通过引入这个组件可以让你的项目快速支持集群模式!(部署是部署工具和写法的问题,模式是模式哈,有主从的那个是模式)集群部署:你可以把一个服务部署在多个容器上或者多台服务器上,他们可以说代码是一模一样的,甚至配置也是!
访问上一般配合nginx的upstream做负载均衡访问!
如果你有一个任务,每个小时执行一次数据收集,假设所有的节点都是往一个redis打点数据
那么你最好使用外部的任务调度器来执行这个收集报表数据的事宜,比如使用PasteTask来处理远程任务调度!集群模式:集群模式的前提一定是集群部署了的,每个节点有点不一样的地方在于需要有一个领导者,
当然也有可能这个群的节点数量为1,至少PasteCluster只要有1个及以上节点即可运行!
这种模式下的访问我们先不讨论,至少有2个身份,一个是master一个就是cluster了,负责的事宜也不太一样,或者说包含关系!
比如master能处理cluster的所有事情,cluster却不能处理master的事宜!

PasteCluster的源码和使用案例

image

接入现有项目的案例

image
如上载入必要的配置,关于ClusterConfig的内容有啥,需要查看源码PasteCloveConfig.cs这个文件,大概有十多个配置项,也可以使用默认值哈!
这个配置在appsettings.json是这样的
image
PasteClusterHandlr这个逻辑他会产生一个Channel的异步队列,业务代码上处理这个业务队列即可,比方说我们启动一个HostedService来处理这个队列
image
上面核心代码其实只有2行

            //如果已知当前节点的Host(这里示例默认为80端口)信息,可以用下方的函数写入,也可以在启动的时候写入配置中// -e ClusterConfig:CurrentHost="http://192.168.1.100"//如果使用PasteSpider部署,则是-e ClusterConfig:CurrentHost="http://{{App.ProAddress}}"//_cluster_handler.Register("http://192.168.1.100",0,0);//启动集群中的当前节点(在这之前必须要确认当前节点的host是多少)_cluster_handler.StartCluster();

            //读取集群产生的数据,比如其他节点发送的数据等,这里一般是业务相关的消息ReadClusterChannel();

其他部分概括下就是写入集群节点列表,写入当前节点的HOST信息的!
你只要关注你的业务代码如下:
image
就是这个default的分支,严格来说应该是msg_type=0或者大于10的(1-10组件用于通知特定消息给业务,至于业务用不用要基于自己的业务需求,比如我上面这个案例就是节点掉线后把他从集群节点列表种删除(记录在redis种))

在最后将会贴出源码

集群需求过程

如果说K8S是对运维人员的部署工具,那么PasteSpider应该就是针对开发人员的部署助手了!
K8S不是有那个主从集群模式么,一开始那会思考也整一个集群模式,这样一个PasteSpider宕机了,还有其他的来承担任务!

去中心化集群

一开始的思路是去中心化的集群模式,说白点就是数据库都是一个节点一个的,当然这里的去中心化和市面上的那个链不是一回事哈,至少没有链衔接和链校验的问题!
假设一个节点就是一套完整的PasteSpider,包括数据库,对应的api和web管理端等

如何保证集群安全性?

集群内的所有节点通讯使用一个统一的密钥,这样可以防止其他集群乱入!

数据防重和数据唯一?

首先一个,数据库的表我分几种:
1.基础表,比如会员信息表,这种基础表肯定每一个节点都要同步的
2.统计表,这个明显的代表就是报表,比如日报表,这样要报表的时候不需要从日志表中读取统计
3.关系表,一般的是基础表和基础表的关系的描述的表,也是要同步的
4.日志表,记录一些操作的,视情况而定,不一定要同步完全
每个表添加3个字段,节点,创建时间戳,更新时间戳,这样在同步数据的时候可以基于这个时间戳获取数据差,然后根据ID规则同步数据
由于对管理端来说,只要一个PasteSpider在工作,也就是Nginx中配置主备用模式,这样也就是说主要的数据都是通过一个节点新建的,这样可以排除数据重复的问题
比如说,整个系统的项目的代码需要唯一!

辅助组件如何做到集群

比如项目使用到了nginx,使用到了redis等,如果要完整的集群,甚至是多台服务器,其实严格的集群我个人认为是不可能的!
如果可能,那么每年就不会有那么多这个服务宕机多久,那个宕机多久了!
市面上没有100%的灾备方案,因为在追求100%的路上,你会发觉一直在套娃… … .

普通的集群的现状

image

稍微查询下目前比较流程的一些中间件,用到的集群模式大概有以下要点:
1.集群选举过程中,不对外提供服务,可以理解成集群还没有协商一致好!那么引申的问题就是集群选举越快越好!
2.几乎的集群选举采用的是单数,过半模式!看过各种解释,说白点就是要少数服从多数,我个人觉得这个问题很扯的!
集群的数量发生变动了,这个是不可控的吧!
居然不可控,你咋保证宕机的数量一定是单数?那肯定单数复数都有可能,而且概率还很大,那这半数了个寂寞!
3.很多集群模式引入了多角色,比如蜂王,工蜂,监控蜂… … . 啥意思?难不成只有工蜂和封王会宕机???如果你说不同角色宕机的概率不一样,那我还是信的,毕竟负载不一样!
如果都会宕机,那角色越多是不是意味着集群恢复的逻辑越复杂?角色切换来切换去好玩?
4.还有一种半通的集群情况,比如A,B,C,D,E,其中A只和B,C能通,其他的都能互通!
这什么神奇组合,这种的我思考来思考去,还不如你搞2个集群,然后弄一个中转,或者是业务需求变更下,因为为了实现这个半通,付出的代价太大了!

PasteCluster的方式

1.采用N个节点模式,N大于等于1即可,1的时候表示自己是节点,自己担任任何角色(其实也就2个角色,master和cluster角色)
这里2个角色的区别主要是在业务上,比方说你集群中的某些任务,需要有一个人专门管理,那么这个任务就交给master来处理!
这种集群模式只有2个身份Master(1个)和Cluster(1个或多个)

2.不采用选举模式,采用竞选模式,当发生2个竞选冲突的时候,基于vote_time时间戳和id来确定谁是master!

3.master和cluster他们各自负责的任务,这个由业务代码决定,PasteCluster只是帮你维护集群的关系,让真个集群一直都有master这个角色的节点存在!

选举过程解析

假设有A,B,C,D,E节点,如果当前A为master!停止A,则会发生

B,C,D,E在不同时间点会发觉A不在了,这里的发现可能并发也可能是按照一定的顺序,取决于他们的轮询线
假设B先发现A出问题了,立马给自己的vote_time赋值时间戳ms,然后问(除了自己外的其他节点)你有master么?
假设被问的是C,他这个时候还没有发现A挂了,
他立马健康检查A,
如果通了,则返回A的信息给B,告诉他当前的master是A

如果不通,他则给自己的vote_time赋值,然后告诉B,C就是master!B收到返回后,就把C设为master
C这个时候会进入选举阶段,会把自己的master©信息群发给其他节点,
(这个时候可能其他也有节点发现master失联了了,也各种启动了“竞”)
其他节点收到后,会和自己的master做健康检查,如果接通的话,再做vote_time对比,结合节点Id选举出最小的返回给C,
C这个时候收到的结果,如果返回的master(E)(这个是对比vote_time的结果)不是C他这个节点,则C放弃自己作为节点,
接受返回的master(E)为master!自己做cluster的角色
上述居然有返回E,那么表示E也启动了选举,只是在和C的对比中E胜出了,C由于在遍历节点的时候失败了,所以中途退出了,也就是E在遍历的过程中完成了一整个遍历,没有被中断!
换句话说,这个竞的关键点在于谁先完成对所有节点的遍历和确认,说就是新的master,中途失败的继续当cluster角色

E在完成遍历后,群发一条消息给所有节点,告知我是新的Master(E)
以上整个选举过程就算完成,从时间上来说理论上是一个集群遍历的是时间,不需要其他集群模式的反复选举!
一般的是谁先发现,然后下一个节点选举胜出!主要是这个问的环节,如果没有问,直接进入竞,复杂度还会提升!

集群问答

1.集群节点保活问题

所有的集群节点都有心跳模式,因为你没办法知道对方下线没有,所以就有一个轮询,一定时间问对方在线否,PasteCluster采用双心跳,心跳复用模式!
双心跳是指master会定期向其他节点轮询是否在线
节点也会问轮询问master是否在线
复用模式在于,每次检查,只有他们之间的交互时间大于设定的时间后才会发送这个心跳检查,比如设定10秒钟!如果近期10秒钟这2个节点有通讯,则这一次心跳不在问询的任务中!

2.如何防止小集群出现(脑裂?)

之前第一次写的时候,后面测试发现会出现有多个master的情况,其实把整个选举过程拆开,每一个步骤延长时间,就很好理解,出现多个master是可能的!
在节点的交互过程中,比如A节点发送信息给B节点,A的请求会附带至少2个信息,A的master是谁,A的所在集群节点的总数,发送给B后,B会对这2个信息进行校验,如何和A的不一致,则B会抛弃自己的master信息,转而进入“问”阶段,再根据这个问的结果,加入集群!

3.啥时候发生加入集群这个动作

加入集群有2种情况,
1.是刚刚启动的时候,肯定得找个集群去加入,虽然有可能整个系统就你一个节点,过程嘛总要走的,说不定有其他节点(节点能通)呢!
2.在数据交互过程中,发现信息不对称,抛弃自己得信息加入到现有得一个主节点

4.啥时候移除节点

1.节点离线的时候主动触发,主动向master告知我这个节点离线了,master重新整理节点信息后,群发给所有其他在线的节点,从而更新整个集群!
2.在节点交互过程中,通讯错误次数超过设定的次数,这个交互包括数据通讯或者定时的健康检查,最终在master健康检查的时候移除这个节点,然后广播

5.给集群分配节点列表信息

1.通过配置文件的方式在启动的时候导入
2.通过AddNodeToList在启动后添加,比如你通过读取Redis获得节点列表,然后导入

6.找到当前节点的IP等信息

节点之间是通过HOSTIP等通讯的,所以最基本的就是需要知道当前节点是哪个IP,有2种模式
1.已知节点列表有10个IP,现在是不知道哪个节点是哪个,把节点列表写入到当前节点信息后,然后调用FindMe(),系统会去遍历列表,从而找到哪个节点信息是当前节点,你可以使用
/api/cluster/status查看自己当前节点是多少来确定是否查找完成!
2.在服务启动的时候给导入IP信息,比如如果你使用PasteSpider部署的项目,而且项目配置了项目网关,则可以在对应服务的配置中添加如下代码即可:
image
如果是非80端口要自己修改下!看配置其实就是在启动的额时候修改配置项ClusterConfig:CurrentHost的值!

7.手动加入一个现有集群

加入一个集群有2个步骤,一个是给当前自己打标签,也就是为自己设定host等信息,然后是ScanMaster();也就是当前节点知道自己是谁后,然后去查找当前的集群信息,然后加入这个集群!

8.主动退出一个集群

image
其实你也可以主动调用Leave函数实现离开集群!

PasteCluste组件的源码

你可以从Gitee拉取最新源码 PasteCluster组件源码和使用案例
拉取后打开项目,只需要关注如下代码
image
其他的子项目是因为这个使用案例是使用PasteTemplate生成的,懒得精简了,看下案例应该就知道如何使用到自己项目中了!

源码解说

基于上图,主要文件是2个

1.PasteClusterController.cs

看文件命名就知道,他是一个Controller,用于节点对外通讯的,每一个接口都有添加header校验,用于校验请求是否属于这个集群,在最后代码处:
image
要安全点的话,你可以加入那个time_temptoken,加密校验,类似OAUTH那个协议模式,过期了丢弃这个token
上图的fromnode用于修改最后交互时间,减少心跳中不必要的检查,
masternode则用于校验是否脑裂?也就是是否有多个master的情况,发现后及时处理!
image
在配置路由转发的时候(nginx的配置),需要为这个路径做代理
源码如下:

using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using Newtonsoft.Json.Linq;namespace PasteCluster
{/// <summary>/// /// </summary>[ApiController][Route("/api/cluster/[action]")]public class PasteClusterController : Controller{private PasteClusterHandler _handler;private PasteSloveConfig _config;/// <summary>/// /// </summary>/// <param name="handler"></param>/// <param name="config"></param>public PasteClusterController(PasteClusterHandler handler, IOptions<PasteSloveConfig> config){_config = config.Value;_handler = handler;}/// <summary>/// 健康检查/// </summary>/// <returns></returns>[HttpGet]public PasteApiResponse health(){CheckHeader();return _handler.Health();}/// <summary>/// 读取当前状态/// </summary>/// <returns></returns>[HttpGet]public string status(){return _handler.Status();}/// <summary>/// 问询master信息/// </summary>/// <returns></returns>[HttpGet]public PasteApiResponse ask(){CheckHeader();return _handler.Ask();}/// <summary>/// 查找自己/// </summary>/// <param name="input"></param>/// <returns></returns>[HttpPost]public async Task<PasteApiResponse> find(PasteNodeModel input){CheckHeader();return await _handler.Find(input);}/// <summary>/// 有节点加入/// </summary>/// <param name="input"></param>/// <returns></returns>[HttpPost]public PasteApiResponse join(PasteNodeModel input){CheckHeader();_handler.Join(input);return _handler.State();}/// <summary>/// 转发消息/// </summary>/// <param name="input"></param>/// <returns></returns>[HttpPost]public async Task<PasteApiResponse> msg(PasteSloveMessage input){CheckHeader();input.from_api = true;return await _handler.Msg(input);}/// <summary>/// 往集群中发送消息/// </summary>/// <param name="input"></param>/// <returns></returns>[HttpPost]public PasteApiResponse push(PasteSloveMessage input){CheckHeader();_handler.PushMsgToNode(input.body);return _handler.State();}/// <summary>/// 这个一般是PasteSpider推送过来的信息/// </summary>/// <param name="input"></param>/// <returns></returns>[HttpPost]public PasteApiResponse run(){JObject info = null;if (base.Request.Body != null){if (base.Request.Body != null){using var stream = new StreamReader(base.Request.Body);var bodystr = stream.ReadToEndAsync().GetAwaiter().GetResult();//Console.WriteLine(bodystr);info = Newtonsoft.Json.JsonConvert.DeserializeObject<JObject>(bodystr);}}if (info != null){var _host = string.Empty;var _port = 80;JObject input = info;//if(info is JObject)//{//    input = input as JObject;//}//else//{//    Console.WriteLine("input is not JContainer");//    return _handler.State();//}if (input.ContainsKey("proAddress")){_host = input["proAddress"].ToString();}if (input.ContainsKey("extendService")){var _ser = input["extendService"];if (_ser.Type == JTokenType.Object){var _serobj = _ser as JObject;if (_serobj.ContainsKey("listenPorts")){var ports = _serobj["listenPorts"].ToString();if (!String.IsNullOrEmpty(ports)){int.TryParse(ports.Split(',')[0], out var _pp);if (_pp > 0){_port = _pp;}}}}}if (!String.IsNullOrEmpty(_host)){if (_port != 80){_handler.Register($"http://{_host}:{_port}", 0, 0);}else{_handler.Register($"http://{_host}", 0, 0);}}else{Console.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(input));}}return _handler.State();}/// <summary>/// 手动注册自己/// </summary>/// <param name="input"></param>/// <returns></returns>[HttpGet]public string joinin([FromQuery] PasteNodeModel input, string token){if (token == _config.SloveToken){_handler.Register(input.host, input.id, input.group);}else{return "token参数错误!";}return "手动加入!";}/// <summary>/// 加入某一个节点,当前需要为master/// </summary>/// <param name="input"></param>/// <returns></returns>[HttpGet]public async Task<string> addone([FromQuery] PasteNodeModel input, string token){if (token == _config.SloveToken){if (_handler.CurrentIsMaster || _handler.IsSingle){var node = new PasteNodeModel{host = input.host,id = input.id,group = input.group,};await _handler.AddNodeToList(node);}else{return "当前节点不是Master,无法执行这个命令";}}else{return "token参数错误!";}return "手动加入!";}/// <summary>/// 节点集合/// </summary>/// <returns></returns>[HttpGet]public PasteNodeModel[] nodes(){return _handler.Nodes();}/// <summary>/// 执行选举/// </summary>/// <param name="input"></param>/// <returns></returns>[HttpPost]public async Task<PasteApiResponse> vote(PasteNodeModel input){CheckHeader();return await _handler.Vote(input);}/// <summary>/// 被通知,有成为master/// </summary>/// <param name="input"></param>/// <returns></returns>[HttpPost]public async Task<PasteApiResponse> master(PasteMasterResponse input){CheckHeader();//Console.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(input));return await _handler.Master(input);}/// <summary>/// 有节点离开/// </summary>/// <param name="input"></param>/// <returns></returns>[HttpPost]public async Task<PasteApiResponse> leave(PasteNodeModel input){CheckHeader();return await _handler.Leave(input);}/// <summary>/// 移除某节点/// </summary>/// <param name="input"></param>/// <returns></returns>[HttpPost]public async Task<PasteApiResponse> remove(PasteNodeModel input){CheckHeader();return await _handler.Remove(input);}/// <summary>/// 检查信息是否合法/// </summary>/// <exception cref="Exception"></exception>private void CheckHeader(){if (base.Request.Headers.ContainsKey("slovetoken")){if (base.Request.Headers["slovetoken"] != _config.SloveToken){throw new Exception("slovetoken错误,请重新输入!");}else{if (base.Request.Headers.ContainsKey("fromnode")){_handler.TargetMasterLast(base.Request.Headers["fromnode"].ToString());}if (base.Request.Headers.ContainsKey("masternode")){_handler.ConfirmMaster(base.Request.Headers["masternode"].ToString());}}}else{throw new Exception("slovetoken不能为空,请重新输入!");}}}
}
2.PasteClusterHandler.cs

这个文件就是集群选举,维护等的主要逻辑,里面使用了2个Channel异步队列,一个是这个集群组件内部使用的,有写入数据和读取数据,另外一个就是给业务用的了,在上面的案例中的HostedService中有体现!
整个选举过程和修复等,可以查看这个文件的源码,有疑问的麻烦在评论中回复!!!
源码内容:

using System.Net;
using System.Text;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;namespace PasteCluster
{/// <summary>/// 注意用IOC单例注入系统,多处调用/// </summary>public class PasteClusterHandler : IDisposable{/// <summary>/// /// </summary>private readonly IHttpClientFactory _httpClientFactory;/// <summary>/// 用于内部消息中转等 节点中的任务/// </summary>private Channel<PasteEventModel> _channel_slove;/// <summary>/// 业务队列 表示队列中的消息要业务逻辑处理,当前可能为slove也可能为master/// 业务监听队列/// 表示业务要处理的消息/// 当前节点可能是slove节点/// 也可能是master节点/// </summary>public Channel<PasteSloveMessage> ChannelCluster;/// <summary>/// 当前节点信息/// </summary>private PasteNodeModel _current_node = null;/// <summary>/// 当前的Master是哪个/// </summary>private PasteNodeModel _current_master = null;/// <summary>/// /// </summary>private ILogger<PasteClusterHandler> _logger = null;/// <summary>/// 当前是否是主节点/// </summary>public bool CurrentIsMaster{get{if (_current_master != null && _current_node != null){return _current_master.host == _current_node.host;}return false;}}/// <summary>/// 是否是单例模式/// 当前节点为null/// 或者没有节点列表/// </summary>public bool IsSingle{get{if (_current_master == null || _current_node == null){return true;}if (node_list == null || node_list.Count <= 1){return true;}return false;}}/// <summary>/// 当前节点的名称 如果为空表示没有或者本身配置就是空/// </summary>public string CurrentName{get{if (_current_node != null){return _current_node.name;}return "";}}/// <summary>/// 获取或设置当前集群的密钥 如果要修改这个需要在所有的调用之前配置/// </summary>public string SloveToken{get{return _config.SloveToken;}set{_config.SloveToken = value;}}/// <summary>/// 当前节点的名称 如果为空表示没有或者本身配置就是空/// </summary>public string CurrentHost{get{if (_current_node != null){return _current_node.host;}return "";}}/// <summary>/// 当前节点/// </summary>public PasteNodeModel CurrentNode{get{return _current_node;}}/// <summary>/// 当前节点的名称 如果为空表示没有或者本身配置就是空/// </summary>public int CurrentId{get{if (_current_node != null){return _current_node.id;}return 0;}}/// <summary>/// 当前配置信息/// </summary>private PasteSloveConfig _config;/// <summary>/// /// </summary>private List<PasteNodeModel> node_list = null;/// <summary>/// 是否在选举中/// </summary>private bool voting = false;/// <summary>/// /// </summary>private System.Timers.Timer _timer;/// <summary>/// 当前节点的代码,初始值为空,可以被赋值,被赋值后不能再次被赋值/// </summary>private string node_code = String.Empty;/// <summary>/// 暂存发往节点的消息,这个是指消息,业务上的消息/// </summary>private List<PasteEventModel> ziplist = null;/// <summary>/// /// </summary>private System.Threading.SemaphoreSlim _semaphoreSlim;/// <summary>/// /// </summary>private long msg_count = 0;/// <summary>/// /// </summary>public PasteClusterHandler(ILogger<PasteClusterHandler> logger, IOptions<PasteSloveConfig> config, IHttpClientFactory httpClientFactory){_logger = logger;_config = config.Value;_httpClientFactory = httpClientFactory;_semaphoreSlim = new SemaphoreSlim(1);ziplist = new List<PasteEventModel>();_channel_slove = Channel.CreateBounded<PasteEventModel>(_config.SloveChannelMsgCapacity);ChannelCluster = Channel.CreateBounded<PasteSloveMessage>(_config.MasterChannelMsgCapacity);_timer = new System.Timers.Timer(1000);_timer.Elapsed += _timer_Elapsed;_timer.AutoReset = true;_timer.Start();ActionSloveEvent();}/// <summary>/// 当前计数的次数/// </summary>private int _tick_index = 0;/// <summary>/// 给当前节点赋值code /// </summary>/// <param name="code"></param>/// <returns>是否赋值成功</returns>public bool SetNodeCode(string code){if (String.IsNullOrEmpty(node_code)){node_code = code;}else{return false;}return true;}/// <summary>/// 本地用于读取节点列表/// </summary>/// <returns></returns>public PasteNodeModel[] Nodes(){if (node_list != null){return node_list.ToArray();}return null;}/// <summary>/// 启动集群/// 读取ClusterHost作为集群列表/// 读取CurrentHost作为当前节点得Host/// 如果有NodeList和NodeCode(CurrentCode)触发查找自己/// 否则触发查找Master/// </summary>public async void StartCluster(){if (!string.IsNullOrEmpty(_config.CurrentHost)){Register(_config.CurrentHost, 0, 0);}if (!String.IsNullOrEmpty(_config.ClusterHost)){var _list = new List<PasteNodeModel>();var strs = _config.ClusterHost.Split(";");foreach (var str in strs){if (!string.IsNullOrEmpty(str)){var its = str.Split(',');var _one = new PasteNodeModel { };_one.host = its[0];if (its.Length > 1){int.TryParse(its[1], out var _id);_one.id = _id;}if (its.Length > 2){int.TryParse(its[2], out var _group);_one.group = _group;}if (!_list.Contains(_one)){_list.Add(_one);}}}if (_list.Count > 0){foreach (var _one in _list){await AddNodeToList(_one);}}}if (!String.IsNullOrEmpty(_config.CurrentCode)){SetNodeCode(_config.CurrentCode);}if (!String.IsNullOrEmpty(node_code)){if (node_list != null && node_list.Count > 0){await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });}}else{Console.WriteLine("begin to join cluster:" + node_list.Select(x => x.host).ToString());await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });}}/// <summary>/// 需要确保先调用SetNodeCode 表示给自己打一个标记/// 然后会启动一个任务去遍历集合,查找这个NodeCode,找到后就知道自己的HOST信息了/// </summary>public async void FindMe(){if (!String.IsNullOrEmpty(node_code)){if (node_list != null && node_list.Count > 0){await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });}}}/// <summary>/// 开始查找自己 需要有节点集合(通过AddNodeToList赋值) 需要有当前节点得node_code(通过SetNodeCode赋值)/// </summary>/// <returns></returns>public async Task<bool> StartFindMe(){if (!String.IsNullOrEmpty(node_code)){if (node_list != null && node_list.Count > 0){await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.findnode });return true;}}return false;}/// <summary>/// 如果消息来自Master则更新我和Master的最后交互时间/// 这个功能用于减少不必要的心跳资源浪费/// </summary>/// <param name="host"></param>public void TargetMasterLast(string host){if (_current_master != null){if (_current_master.host == host){_current_master.last_time = DateTime.Now.ToUnixTimeSeconds();//Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} Node:{CurrentHost} Update Master Last Time!");}}}/// <summary>/// 确认对方的Master和我的是否一致!/// 如果不一致,则我进去查找Master阶段/// </summary>/// <param name="host"></param>public void ConfirmMaster(string host){if (_current_master != null){if (_current_master.host != host){_channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });}}}/// <summary>/// 拥有集群的地址列表,去找到自己/// 这个是给内部调用的,请勿调用/// </summary>/// <param name="input"></param>public async Task<PasteApiResponse> Find(PasteNodeModel input){if (!String.IsNullOrEmpty(input.node_code)){if (node_code == input.node_code){//找到自己了_current_node = new PasteNodeModel{node_code = node_code,group = input.group,host = input.host,id = input.id,};await AddNodeToList(_current_node);await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });}}return new PasteApiResponse { success = true, master = _current_master, node = _current_node };}/// <summary>/// 定时检查/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private async void _timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e){try{if (_current_node != null){_tick_index++;//健康检查 被检查等!if (_tick_index % _config.TickSloveHealth == 0){var _ago = DateTime.Now.ToUnixTimeSeconds() - _config.TickScanSloveHealth;if (CurrentIsMaster){//健康检查 检查节点多久没通讯了if (node_list != null && node_list.Count > 0){var _post = new PasteMasterResponse { };_post.master = _current_node;_post.nodes = node_list?.ToArray();var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_post);try{await _semaphoreSlim.WaitAsync();foreach (var _node in node_list){if (_node.host != _current_node.host){if (_node.last_time < _ago){var _api = await slove_get(_node, _config.ApiHealth);if (_api.success){if (_api.node_count != node_list.Count || _api.master?.host != _current_master?.host){//告知对方,你的集群数据有误!await slove_post(_node, _postBody, _config.ApiMaster);}}}}else{_node.last_time = DateTime.Now.ToUnixTimeSeconds();//自己直接就是最新的}}}catch (Exception exl){_logger.LogError("line.341" + exl.Message);}finally{_semaphoreSlim.Release();}//错误次数超过多少次的 执行移除操作!var removes = node_list.Where(x => x.error_time > _config.RemoveByTime).ToList();if (removes != null && removes.Count > 0){foreach (var _node in removes){await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.removenode, message = Newtonsoft.Json.JsonConvert.SerializeObject(_node) });}}}}else{if (_current_master != null){if (_current_master.last_time < _ago){//Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} node:{_current_node?.host} master:{_current_master?.host} timeout!");var _api = await slove_get(_current_master, _config.ApiHealth);if (_api.success){//我记录的master和master记录的master不一致if (_api.node_count != node_list.Count || _api.master?.host != _current_master?.host){//告知对方,你的集群数据有误!await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });}}else{//master失联了await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });}}}else{//选举?await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });}}}}if (_tick_index > 3600){_tick_index = 0;}}catch (Exception exlelapsed){_logger.LogError("line.396" + exlelapsed.Message);}}/// <summary>/// 业务端调用 表示把消息打入到集群 发给所有节点或者某些节点/// </summary>/// <param name="msg"></param>/// <param name="node_id"></param>/// <param name="group"></param>public void PushMsgToNode(string msg, int msg_type = 0, int node_id = 0, int group = 0, string name = ""){var _msg = new PasteSloveMessage{body = msg,msg_type = msg_type,from = _current_node};var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);if (!voting){//如果当前在选举中,则暂存!_channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_node, message = _msgbody, msg_type = msg_type, node_id = node_id, group = group, name = name });}else{ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_node, message = _msgbody, msg_type = msg_type, node_id = node_id, group = group, name = name });}}/// <summary>/// 把消息发送给所有节点,包括自己/// </summary>/// <param name="msg"></param>/// <param name="node_id"></param>/// <param name="group"></param>public void PushMsgToAll(string msg, int msg_type = 0){var _msg = new PasteSloveMessage{body = msg,msg_type = msg_type,from = _current_node};var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);if (!voting){//如果当前在选举中,则暂存!_channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_all_node, message = _msgbody, msg_type = msg_type });}else{ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_all_node, message = _msgbody, msg_type = msg_type });}}/// <summary>/// 发送消息给master所在的节点的业务处理,如果当前为master则变更channel,切换到给业务处理/// </summary>/// <param name="msg"></param>public void PushMsgToMaster(string msg, int msg_type = 0){var _msg = new PasteSloveMessage{body = msg,from = _current_node,msg_type = msg_type};var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);if (CurrentIsMaster || _current_master == null){//切换队列ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { body = _msgbody });}else{if (!voting){_channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.msg_to_master, message = _msgbody, msg_type = msg_type });}else{ziplist.Add(new PasteEventModel { action = PasteSloveEvent.msg_to_master, message = _msgbody, msg_type = msg_type });}}}/// <summary>/// 直接推送消息,主要作用在于立马反馈/// 如果没有对象,则会发送给master/// 可以从from中判断是不是回环了/// </summary>/// <param name="msg"></param>/// <param name="msg_type"></param>/// <param name="id"></param>/// <param name="name"></param>/// <param name="group"></param>/// <param name="host"></param>/// <returns></returns>public async Task<bool> PushDirectionMsg(string msg, int msg_type = 0, int id = 0, string name = "", int group = 0, string host = ""){var _message = new PasteSloveMessage{body = msg,msg_type = msg_type,};return await PushDirectionMsg(_message, id, name, group, host);}/// <summary>/// 直接发送消息给某一个节点/// 如果没有命中则发送给Master/// 也就是说,可能发送给当前!注意不要回环了!!!/// </summary>/// <param name="message"></param>/// <param name="id"></param>/// <param name="name"></param>/// <param name="group"></param>/// <param name="host">示例http://192.168.1.5</param>/// <returns>是否发送成功,发送给某一个节点了?</returns>public async Task<bool> PushDirectionMsg(PasteSloveMessage message, int id = 0, string name = "", int group = 0, string host = ""){message.from = _current_node;var _send = false;if (id != 0){if (CurrentId == id){message.to = _current_node;var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_postBody);await ChannelCluster.Writer.WriteAsync(_message);return true;}else{var find = node_list.Where(x => x.id == id).FirstOrDefault();if (find != null && find != default){message.to = find;var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);var _api = await slove_post(find, _postBody, _config.ApiMsg);return _api.success;}}}if (!string.IsNullOrEmpty(host) && node_list != null){var find = node_list.Where(x => x.host == host).FirstOrDefault();if (find != null && find != default){message.to = find;var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);var _api = await slove_post(find, _postBody, _config.ApiMsg);return _api.success;}}if (!string.IsNullOrEmpty(name) && node_list != null){var find = node_list.Where(x => x.name == name).FirstOrDefault();if (find != null && find != default){message.to = find;var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);var _api = await slove_post(find, _postBody, _config.ApiMsg);return _api.success;}}if (group != 0 && node_list != null){var find = node_list.Where(x => x.group == group).FirstOrDefault();if (find != null && find != default){message.to = find;var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);var _api = await slove_post(find, _postBody, _config.ApiMsg);return _api.success;}}if (!_send){if (CurrentIsMaster || IsSingle){message.to = _current_node;await ChannelCluster.Writer.WriteAsync(message);return true;}else{if (_current_master != null){message.to = _current_master;var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(message);var _api = await slove_post(_current_master, _postBody, _config.ApiMsg);return _api.success;}}}return _send;}/// <summary>/// 如果当前不是master则不执行,直接抛弃,如果当前是master则执行/// </summary>/// <param name="msg"></param>public async Task<bool> PushMsgToOnlyMaster(string msg, int msg_type = 0){var _msg = new PasteSloveMessage{body = msg,msg_type = msg_type,from = _current_node};var _msgbody = Newtonsoft.Json.JsonConvert.SerializeObject(_msg);if (CurrentIsMaster || _current_master == null){await ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { body = _msgbody });return true;}return false;}/// <summary>/// 处理节点中队列的消息/// </summary>private async void ActionSloveEvent(){try{var _read = await _channel_slove.Reader.ReadAsync();if (_read != null && _read != default){//_logger.LogInformation("Cluster.Event:" + Newtonsoft.Json.JsonConvert.SerializeObject(_read));switch (_read.action){case PasteSloveEvent.votemaster:{//告知别人,我是masterif (_current_node == null){return;}_current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();if (node_list != null && node_list.Count > 0){var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_current_node);//必须大于1,有2个及以上节点,才有master的说法foreach (var _node in node_list){if (_node.host != _current_node.host){var _api = await slove_post(_node, _postBody, _config.ApiVote);if (_api.success){//_node.last_time = DateTime.Now.ToUnixTimeSeconds();if (_api.master != null){if (_api.master.host != _current_node.host){//找到一个竞争者? 有节点说他才是master 而且时间比我小 我只好退位让贤_current_master = _api.master;break;}}}}}//选举完成了,我是不是master?if (_current_master == null){_current_master = _current_node;await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });}}}break;case PasteSloveEvent.scanmaster:{if (_current_node == null){return;}_current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();if (node_list != null && node_list.Count > 0){voting = true;//必须大于1,有2个及以上节点,才有master的说法foreach (var _node in node_list){var _api = await slove_get(_node, _config.ApiAsk);if (_api.success && _api.master != null){_current_master = _api.master;if (_current_node.host == _current_master.host){await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });}break;}}if (_current_master != null){if (_current_node?.host != _current_master?.host){//告知 master 我要加入集群var _join = await slove_post(_current_master, Newtonsoft.Json.JsonConvert.SerializeObject(_current_node), _config.ApiJoinSlove);if (!_join.success){//这里失败了,咋办??? 未完待续voting = true;_current_master = null;}}}else{//饶了一圈,全部不能访问_current_master = _current_node;voting = false;await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });}}else{//节点信息不完整 还是只有这么一个节点? 未完待续_current_master = _current_node;voting = false;await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });}}break;case PasteSloveEvent.suremaster:{if (_current_node == null){return;}if (node_list != null && node_list.Count > 0){var _post = new PasteMasterResponse { };_post.master = _current_node;_post.nodes = node_list?.ToArray();var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_post);//必须大于1,有2个及以上节点,才有master的说法foreach (var _node in node_list){if (_node.host != _current_node.host){var _api = await slove_post(_node, _postBody, _config.ApiMaster);}}await ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { msg_type = 1, body = _postBody, from = _current_node, time = DateTime.Now.ToUnixTimeSeconds() });}}break;case PasteSloveEvent.msg_to_master:{var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_read.message);if (CurrentIsMaster){_message.from = _current_node;_message.to = _current_node;await ChannelCluster.Writer.WriteAsync(_message);}else{//通过 api 发送给 masterif (_current_master != null){_message.from = _current_node;_message.to = _current_master;var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);//如果发送失败,暂停业务,进行vote阶段var _join = await slove_post(_current_master, _postBody, _config.ApiMsg);if (!_join.success){if (_read.try_time > 0){_read.try_time = _read.try_time - 1;//这里失败了,咋办??? 未完待续await _channel_slove.Writer.WriteAsync(_read);}}}}}break;case PasteSloveEvent.msg_to_node:{if (node_list == null || node_list.Count == 0){return;}var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_read.message);if (_read.node_id != 0){var nodes = node_list.Where(x => x.id == _read.node_id).ToList();if (nodes != null && nodes.Count > 0){foreach (var _node in nodes){_message.to = _node;var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);var _api = await slove_post(_node, _postBody, _config.ApiMsg);}}}if (_read.group != 0){var nodes = node_list.Where(x => x.group == _read.group).ToList();if (nodes != null && nodes.Count > 0){foreach (var _node in nodes){_message.to = _node;var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);var _api = await slove_post(_node, _postBody, _config.ApiMsg);}}}if (_read.group == 0 && _read.node_id == 0){foreach (var _node in node_list){_message.to = _node;var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);var _api = await slove_post(_node, _postBody, _config.ApiMsg);}}}break;case PasteSloveEvent.msg_to_all_node:{if (node_list == null || node_list.Count == 0){return;}var _message = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteSloveMessage>(_read.message);foreach (var _node in node_list){_message.to = _node;var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_message);var _api = await slove_post(_node, _postBody, _config.ApiMsg);}}break;case PasteSloveEvent.removenode:{if (CurrentIsMaster){var _node = Newtonsoft.Json.JsonConvert.DeserializeObject<PasteNodeModel>(_read.message);await RemoveCurrentNode(_node);//下发群发消息await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });var _messge = new PasteSloveMessage{body = _read.message,from = _current_node,msg_type = 4,};await ChannelCluster.Writer.WriteAsync(_messge);}}break;case PasteSloveEvent.unziplist:{if (ziplist != null && ziplist.Count > 0){var _count = ziplist.Count;Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}将从队列ZIPLIST中发送{ziplist.Count}条消息");for (var k = 0; k < _count; k++){if (!voting){if (ziplist.Count > 0){var _first = ziplist[0];await _channel_slove.Writer.WriteAsync(_first);ziplist.RemoveAt(0);}}}}break;}case PasteSloveEvent.findnode:{var _find = false;if (!string.IsNullOrEmpty(node_code)){if (node_list != null && node_list.Count > 0){foreach (var _node in node_list){if (String.IsNullOrEmpty(_node.node_code)){var _build = new PasteNodeModel{group = _node.group,id = _node.id,host = _node.host,node_code = node_code};var _api = await slove_post(_build, Newtonsoft.Json.JsonConvert.SerializeObject(_build), _config.ApiFind);if (_api.success){if (_api.node != null){if (_api.node.node_code == node_code){_find = true;break;}}}}}}}if (_find){await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });}}break;}}}catch (Exception exl){_logger.LogError(exl.Message);await Task.Delay(1000);}finally{ActionSloveEvent();}}/// <summary>/// 知道当前节点信息后,注册到节点或者更新当前节点信息/// </summary>/// <param name="host"></param>/// <param name="id"></param>/// <param name="group"></param>public async void Register(string host, int id, int group = 0, string name = ""){Console.WriteLine($"Register At:{host}");var _current = new PasteNodeModel{group = group,host = host,id = id,name = name,last_time = DateTime.Now.ToUnixTimeSeconds()};_current_node = _current;await AddNodeToList(_current);//准备问询await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });}/// <summary>/// 有新的节点加入 可以是业务上读取节点列表,然后遍历调用这个/// </summary>/// <param name="input"></param>public async void Join(PasteNodeModel input){await AddNodeToList(input);//群发if (CurrentIsMaster){await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.suremaster });//告知系统有新的节点var _messge = new PasteSloveMessage{body = Newtonsoft.Json.JsonConvert.SerializeObject(input),from = _current_node,msg_type = 3,};await ChannelCluster.Writer.WriteAsync(_messge);}}/// <summary>/// 添加节点/// </summary>/// <param name="input"></param>/// <param name="scan">加入后,是否搜索master</param>public async Task AddNodeToList(PasteNodeModel input){try{await _semaphoreSlim.WaitAsync();//Console.WriteLine($"--join--:{input.host}");if (node_list == null){node_list = new List<PasteNodeModel> { input };}else{var find = node_list.Where(x => x.host == input.host).FirstOrDefault();if (find != null && find != default){find.id = input.id;find.group = input.group;}else{node_list.Add(input);}}}catch (Exception ex){_logger.LogError(ex.Message);}finally{_semaphoreSlim.Release();}}/// <summary>/// 扫描master 这个发生在加入新的节点信息后 或者启动后/// </summary>public async void ScanMaster(){await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });}/// <summary>/// 健康检查,返回当前信息/// </summary>/// <returns></returns>public PasteApiResponse Health(){return new PasteApiResponse { success = true, master = _current_master, node = _current_node, node_count = node_list != null ? node_list.Count : 0 };}/// <summary>/// 被问询 当前有没有master/// </summary>public PasteApiResponse Ask(){if (_current_node != null){if (_current_master == null){_current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();//_current_master = _current_node;//CurrentIsMaster = true;//进入分发流程,告知我是master_channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.votemaster, message = "", time = DateTime.Now.ToUnixTimeMilliseconds() });}}//if (_current_node != null)//{//    _current_node.vote_time = DateTime.Now.ToUnixTimeMilliseconds();//}return new PasteApiResponse { success = true, master = _current_master, node = _current_node };}/// <summary>/// 集群间的消息/// </summary>/// <param name="msg"></param>/// <returns></returns>public async Task<PasteApiResponse> Msg(PasteSloveMessage msg){await ChannelCluster.Writer.WriteAsync(msg);msg_count++;if (msg.msg_type == 2){}return new PasteApiResponse { success = true, master = _current_master, node = _current_node };}/// <summary>/// 有master分发消息,说他是master/// </summary>/// <param name="input"></param>/// <returns></returns>public async Task<PasteApiResponse> Vote(PasteNodeModel input){if (_current_node != null){//排除当前节点问当前节点if (_current_node.host != input.host){if (_current_master == null){//当前没有master 那来的就是master吧_current_master = input;}else{//当前有masterif (_current_master.host != input.host){if (_current_master.vote_time < input.vote_time){var _api = await slove_get(_current_master, _config.ApiHealth);if (_api.success){return new PasteApiResponse { success = true, master = _current_master, node = _current_node };}else{_current_master = input;}}}return new PasteApiResponse { success = true, master = _current_master, node = _current_node };}}}//返回 哪个是masterreturn new PasteApiResponse { success = true, master = input, node = _current_node };}/// <summary>/// master选举完成,master广播说他是master/// </summary>/// <param name="input"></param>/// <returns></returns>public async Task<PasteApiResponse> Master(PasteMasterResponse input){var _old_is_master = CurrentIsMaster;_current_master = input.master;if (input.nodes != null){foreach (var _node in input.nodes){await AddNodeToList(_node);}}voting = false;//如果队列有暂存消息,则发送!await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.unziplist });var _new_is_master = CurrentIsMaster;if (_old_is_master && !_new_is_master){await ChannelCluster.Writer.WriteAsync(new PasteSloveMessage { msg_type = 2, body = Newtonsoft.Json.JsonConvert.SerializeObject(input), from = _current_node });}//返回 哪个是masterreturn new PasteApiResponse { success = true, master = _current_master, node = _current_node };}/// <summary>/// /// </summary>/// <param name="input"></param>/// <returns></returns>public async Task<PasteApiResponse> Leave(PasteNodeModel input){if (_current_node != null){if (_current_node.host != input.host){//谁离线了//如果自己是master 则应该是节点离线 通知其他节点,节点列表变更//如果自己是node 离线的是master 则自己进去选择if (CurrentIsMaster){if (node_list != null && node_list.Count > 0){await RemoveCurrentNode(input);if (node_list.Count > 0){var _post = new PasteMasterResponse { };_post.master = _current_node;_post.nodes = node_list?.ToArray();var _postBody = Newtonsoft.Json.JsonConvert.SerializeObject(_post);foreach (var _node in node_list){if (_node.host != _current_node.host && _node.host != input.host){var _api = await slove_post(_node, _postBody, _config.ApiMaster);}}}}//告知系统有新的节点var _messge = new PasteSloveMessage{body = Newtonsoft.Json.JsonConvert.SerializeObject(input),from = _current_node,msg_type = 4,};await ChannelCluster.Writer.WriteAsync(_messge);}else{if (_current_master != null && input.host == _current_master.host){_current_master = null;await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });}}}}return new PasteApiResponse { success = true, master = _current_master, node = _current_node };}/// <summary>/// /// </summary>/// <param name="input"></param>/// <returns></returns>public async Task<PasteApiResponse> Remove(PasteNodeModel input){if (_current_node != null){if (_current_node.host == input.host){//我居然被移除了,赶紧加回去await _channel_slove.Writer.WriteAsync(new PasteEventModel { action = PasteSloveEvent.scanmaster });}await RemoveCurrentNode(input);}return new PasteApiResponse { success = true, master = _current_master, node = _current_node };}/// <summary>/// 移除节点/// </summary>/// <param name="input"></param>/// <returns></returns>private async Task RemoveCurrentNode(PasteNodeModel input){try{await _semaphoreSlim.WaitAsync();if (node_list != null){var find = node_list.Where(x => x.host == input.host).FirstOrDefault();if (find != null && find != default){node_list.Remove(find);}}}catch (Exception exl){_logger.LogError(exl.Message);}finally{_semaphoreSlim.Release();}}/// <summary>/// 打印当前状态/// </summary>/// <returns></returns>public string Status(){return $"总节点数:{node_list?.Count} Master:{_current_master?.host} Current:{_current_node?.host} Count:{msg_count}";}/// <summary>/// 打印当前状态/// </summary>/// <returns></returns>public PasteApiResponse State(){return new PasteApiResponse { success = true, master = _current_master, node = _current_node };}/// <summary>/// 离线 告诉除了自己的第一个节点,我要离线了!/// </summary>private async Task _leave(){if (CurrentIsMaster){if (node_list != null && node_list.Count > 0){if (_current_node != null){foreach (var _node in node_list){if (_node.host != _current_node?.host){var _api = await slove_post(_node, Newtonsoft.Json.JsonConvert.SerializeObject(_current_node), $"{_config.ApiLeave}");if (_api.success){break;}}}}}}else{if (_current_master != null && _current_node != null){await slove_post(_current_master, Newtonsoft.Json.JsonConvert.SerializeObject(_current_node), $"{_config.ApiLeave}");}}}#region/// <summary>/// /// </summary>/// <param name="node"></param>/// <param name="path"></param>/// <returns></returns>private async Task<PasteApiResponse> slove_get(PasteNodeModel node, string path){var _read = await get($"{node.host}{path}", node.host);if (_read.code == 200){if (node.error_time > 0){node.error_time = 0;}node.last_time = DateTime.Now.ToUnixTimeSeconds();return Newtonsoft.Json.JsonConvert.DeserializeObject<PasteApiResponse>(_read.response);}else{node.error_time++;return new PasteApiResponse { success = false };}}/// <summary>/// /// </summary>/// <param name="node"></param>/// <param name="body"></param>/// <param name="path"></param>/// <returns></returns>private async Task<PasteApiResponse> slove_post(PasteNodeModel node, string body, string path){var _read = await post($"{node.host}{path}", body, node.host);if (_read.code == 200){if (node.error_time > 0){node.error_time = 0;}node.last_time = DateTime.Now.ToUnixTimeSeconds();return Newtonsoft.Json.JsonConvert.DeserializeObject<PasteApiResponse>(_read.response);}else{node.error_time++;return new PasteApiResponse { success = false };}}/// <summary>/// /// </summary>/// <param name="url"></param>/// <returns></returns>private async Task<(int code, string response)> get(string url, string host){try{var request = new HttpRequestMessage(HttpMethod.Get, url);request.Headers.Add("slovetoken", _config.SloveToken);if (_current_node != null){request.Headers.Add("fromnode", _current_node.host);}if (_current_master != null){request.Headers.Add("masternode", _current_master.host);}request.Headers.Add("ContentType", "application/json;charset=utf-8");var client = _httpClientFactory.CreateClient();client.Timeout = TimeSpan.FromSeconds(4);client.DefaultRequestHeaders.Add("ContentType", "application/json;charset=utf-8");var response = await client.SendAsync(request);using var stream = new StreamReader(await response.Content.ReadAsStreamAsync(), Encoding.UTF8);var backstream = stream.ReadToEnd();if (response.StatusCode == HttpStatusCode.OK){_logger.LogInformation(backstream);}else{_logger.LogError(backstream);}return ((int)response.StatusCode, backstream);}catch (Exception exl){_logger.LogError(exl.Message);return (500, exl.Message);}}/// <summary>/// /// </summary>/// <param name="url"></param>/// <param name="postdata"></param>/// <returns></returns>private async Task<(int code, string response)> post(string url, string postdata, string host){try{using (HttpContent httpcontent = new StringContent(postdata)){httpcontent.Headers.Add("slovetoken", _config.SloveToken);if (_current_node != null){httpcontent.Headers.Add("fromnode", _current_node.host);}if (_current_master != null){httpcontent.Headers.Add("masternode", _current_master.host);}httpcontent.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse("application/json;chartset=utf-8");var client = _httpClientFactory.CreateClient();client.Timeout = TimeSpan.FromSeconds(4);client.DefaultRequestHeaders.Referrer = new Uri(url);var response = await client.PostAsync(url, httpcontent);var backstream = await response.Content.ReadAsStringAsync();if (response.StatusCode != HttpStatusCode.OK && response.StatusCode != HttpStatusCode.NoContent && response.StatusCode != HttpStatusCode.Created){_logger.LogError($"URL:{url} CODE:{response.StatusCode} POST:{postdata} BACK:{backstream}");}return ((int)response.StatusCode, backstream);}}catch (Exception exl){_logger.LogError(exl.Message);return (500, exl.Message);}}/// <summary>/// /// </summary>public void Dispose(){//离开集合_leave().Wait();if (_timer != null){_timer.Stop();_timer.Dispose();}//Task.CompletedTask;}#endregion//是否要压入队列 //统计数据打印 某一个消息的次数!}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/851838.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

物联网网关和飞鸟物联平台如何助力其实现智能化升级,提升生产效率-天拓四方

随着工业4.0时代的到来&#xff0c;物联网技术逐渐成为推动工业转型升级的关键力量。物联网网关作为连接工业设备与网络的核心枢纽&#xff0c;在工业自动化、数据收集与分析等方面发挥着越来越重要的作用。本案例将围绕一家知名制造企业&#xff0c;展示物联网网关和飞鸟物联平…

提升消费者满意度的五星售后服务认证

在当今竞争激烈的市场环境中&#xff0c;消费者满意度是企业取得成功的重要因素。五星售后服务认证作为一种权威性认证&#xff0c;可以显著提高消费者满意度&#xff0c;增强企业的竞争力。本文将从四个方面探讨五星售后服务认证如何提高消费者满意度。 五星售后服务认证是由国…

SpringCash

文章目录 简介引入依赖application.yml配置常用注解使用1. 启动类添加注解使用方法上添加注解 简介 Spring Cache是一个框架&#xff0c;实现了基于注解的缓存功能底层可以使用EHCache、Caffeine、Redis实现缓存。 注解一般放在Controller的方法上&#xff0c;CachePut 注解一…

高精度定位技术的必要性与实际应用

在当今社会&#xff0c;随着科技的飞速发展&#xff0c;高精度精准定位技术已成为一项不可或缺的基础技术&#xff0c;其应用范围涉及军事、航空、智能交通、无人驾驶、智慧城市建设等众多领域。高精度精准定位不仅为人们的日常生活带来极大便利&#xff0c;还对提升国家的科技…

鸿蒙轻内核A核源码分析系列六 MMU协处理器(1)

在前面系列分析虚实映射时&#xff0c;涉及到了一些MMU协处理器与相关的汇编代码没有深入讲解。本文来专门分析那些协处理器与汇编代码。 本文中所涉及的源码&#xff0c;以OpenHarmony LiteOS-A内核为例&#xff0c;均可以在开源站点 https://gitee.com/openharmony/kernel_l…

运行mvn命令打包项目jar包报错?“Fatal error compiling: 无效的目标发行版: 19 ”, 让我来看看~

最近写实验&#xff0c;要打包项目&#xff0c;但是不管是在cmd运行“mvn clean package -Dmaven.test.skiptrue”命令&#xff0c;还是在idea上去操作&#xff0c;都出现了这样的一个错误&#xff1a; [EROR] Failed to exeoute goal org.apache.maven.plugins:maven-comnpile…

倾斜摄影优化之重:轻量化

老子云倾斜摄影轻量化 1、支持全网多端流畅展示 2、支持私有化部署 3、一键轻量化G级模型 4、多种精度选择 5、支持二次开发 &#xff08;1&#xff09;、压缩可达98%以上 &#xff08;2&#xff09;、多种精度任选&#xff1a;普清、标清、高清、超清 &#xff08;3&am…

Illustrator 2024 mac/win版:创新设计,一触即发

Illustrator 2024是一款由Adobe公司开发的强大且全面的矢量图形设计软件&#xff0c;它以其卓越的性能和丰富的功能&#xff0c;为设计师们提供了一个高效、直观且富有创意的工作环境。 Illustrator 2024 mac/win版获取 作为一款矢量图形设计软件&#xff0c;Illustrator 2024…

linux shell查看24h内修改的文件

执行命令&#xff1a; find /path -type f -mtime 0

【电机】开环控制系统和闭环控制系统

1 什么是控制系统 控制系统是指由控制主体、控制客体和控制媒体组成的具有自身目标和功能的管理系统。也可以理解为&#xff1a;为了使控制对象达到预期的稳定状态。例如一个水箱的温度控制&#xff0c;可以通过控制加热设备输出的功率进而来改变水温达到目标温度&#xff0c;…

基于WPF技术的换热站智能监控系统09--封装水泵对象

1、添加用户控件 2、编写水泵UI 控件中用到了Viewbox控件&#xff0c;Viewbox控件是WPF中一个简单的缩放工具&#xff0c;它可以帮助你放大或缩小单个元素&#xff0c;同时保持其宽高比。通过样式和属性设置&#xff0c;你可以创建出既美观又功能丰富的用户界面。在实际开发中…

Java使用swing实现简易计算器

效果如下 代码实现 import javax.swing.*; import java.awt.*; import java.awt.event.ActionEvent; import java.awt.event.ActionListener;public class SimpleCalculator {private JFrame frame;private JTextField numField1;private JTextField numField2;private JTex…

数据结构习题(快期末了)

一个数据结构是由一个逻辑结构和这个逻辑结构上的一个基本运算集构成的整体。 从逻辑关系上讲&#xff0c;数据结构主要分为线性结构和非线性结构两类。 数据的存储结构是数据的逻辑结构的存储映像。 数据的物理结构是指数据在计算机内实际的存储形式。 算法是对解题方法和…

java基础知识漏洞三

Object Object 类的常见方法有哪些&#xff1f; /*** native 方法&#xff0c;用于返回当前运行时对象的 Class 对象&#xff0c;使用了 final 关键字修饰&#xff0c;故不允许子类重写。*/ public final native Class<?> getClass() /*** native 方法&#xff0c;用于…

读取CSV文件生成RDD去掉标题行

文章目录 1. 创建CSV文件2. 上传CSV文件3. 读取CSV文件生成RDD4. 去掉标题行生成新RDD5. 查看新生成的RDD 1. 创建CSV文件 执行命令&#xff1a;vim scores.csv 在WPS里查看CSV文件 2. 上传CSV文件 执行命令&#xff1a;hdfs dfs -put scores.csv /park 3. 读取CSV文件生…

公司活动想找媒体报道宣传怎样邀请媒体?

在当今信息爆炸的时代,对于正处于成长阶段的中小企业而言,有效且成本控制得当的宣传策略是推动品牌发展、扩大市场影响力的关键。尤其是在预算有限的情况下,如何让“好钢用在刀刃上”,实现宣传效果的最大化,成为众多企业共同面临的挑战。在此背景下,智慧软文发布系统网站作为一…

智能体,大模型的“增程路线”?

智能体&#xff08;AI Agent&#xff09;&#xff0c;正在将用户对大语言模型的使用分成两种截然不同的方式&#xff1a;同样是写一篇文章&#xff0c;在非智能体工作流中&#xff0c;用户输入提示词&#xff0c;然后等待大模型生成答案的整个过程&#xff0c;就像中间不使用退…

补偿 EMI 滤波器 X 电容对有源 PFC 功率因数的影响

现代开关模式电源使用 X 电容器和 Y 电容器与电感器的组合来过滤共模和差模 EMI。滤波器元件位于任何有源&#xff08;或无源&#xff09;功率因数校正 (PFC) 电路的前面&#xff08;图 1&#xff09;&#xff0c;因此 EMI 滤波器的电抗对功率因数 (PF) 造成的任何失真都会改变…

日本2024年最受欢迎的转职行业是IT 通信

2024年有关机构针对超1000名人力资源专业人士进行了“推荐转职行业”的调查。结果显示&#xff0c;日本目前最受欢迎的转职行业是 1、“IT/通信行业”&#xff08;45.9%&#xff09;&#xff0c; 2、其次是“互联网/广告/游戏”&#xff08;31.9%&#xff09;&#xff0c; 3、“…

【2024最新华为OD-C/D卷试题汇总】[支持在线评测] LYA与朋友们的石头剪刀布游戏(100分) - 三语言AC题解(Python/Java/Cpp)

🍭 大家好这里是清隆学长 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C/D卷的三语言AC题解 💻 ACM银牌🥈| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 🍓OJ题目截图 📎在线评测链接 LYA与朋友们的石头剪刀布游戏(100分) 🌍 评…