28.3 一致性哈希和推送数据的redirect流程

本节重点介绍 :

  • 开启一致性哈希环变更监听处理
    • 这个服务的节点变更了(节点宕机、扩容)就对哈希环进行重置
  • 开启结果监听和watch服务
  • 编写pgw的http接收端
    • 推送数据的redirect流程

一致性哈希和推送数据的redirect流程

开启一致性哈希环变更监听处理

  • 位置 sd/rings.go
  • 当这个服务的节点变更了(节点宕机、扩容)
  • 通过consul的watch操作会通知到这里,也就是 this.NodeUpdateChan会有数据
  • 这时需要从 哈希环中获取节点信息oldNodes := this.ring.Members(),然后两边对对比
  • 如果节点不同则,更新哈希环this.ReShardRing(nodes)
func RunReshardHashRing(ctx context.Context, logger log.Logger) {level.Info(logger).Log("msg", "RunRefreshServiceNode start....")for {select {case nodes := <-NodeUpdateChan:oldNodes := PgwNodeRing.ring.Members()sort.Strings(nodes)sort.Strings(oldNodes)isEq := StringSliceEqualBCE(nodes, oldNodes)if isEq == false {level.Info(logger).Log("msg", "RunReshardHashRing_node_update_reshard", "old_num", len(oldNodes), "new_num", len(nodes), "oldnodes", strings.Join(oldNodes, ","), "newnodes", strings.Join(nodes, ","), )PgwNodeRing.ReShardRing(nodes)} else {level.Info(logger).Log("msg", "RunReshardHashRing_node_same", "nodes", strings.Join(nodes, ","))}case <-ctx.Done():level.Info(logger).Log("msg", "RunReshardHashRingQuit")return}}
}

两个string切片比较 的函数

    func StringSliceEqualBCE(a, b []string) bool {if len(a) != len(b) {return false}if (a == nil) != (b == nil) {return false}b = b[:len(a)]for i, v := range a {if v != b[i] {return false}}return true}

开启结果监听和watch服务

  • sd/sd.go RunRefreshServiceNode函数中
  • 开启Reshard任务,并启动watch
func (c *client) RunRefreshServiceNode(ctx context.Context, srvName string, consulServerAddr string) error {level.Info(c.logger).Log("msg", "RunRefreshServiceNode start....")go RunReshardHashRing(ctx, c.logger)errchan := make(chan error, 1)go func() {errchan <- c.WatchService(ctx, srvName, consulServerAddr)}()select {case <-ctx.Done():level.Info(c.logger).Log("msg", "RunRefreshServiceNode_receive_quit_signal_and_quit")return nilcase err := <-errchan:level.Error(c.logger).Log("msg", "WatchService_get_error", "err", err)return err}return nil
}

启动watch

  • sd/sd.go
  • 如果节点变化了就通过NodeUpdateChan通知 RunReshardHashRing
func (c *client) WatchService(ctx context.Context, srvName string, consulServerAddr string) error {watchConfig := make(map[string]interface{})watchConfig["type"] = "service"watchConfig["service"] = srvNamewatchConfig["handler_type"] = "script"watchConfig["passingonly"] = truewatchPlan, err := watch.Parse(watchConfig)if err != nil {level.Error(c.logger).Log("msg", "create_Watch_by_watch_config_error", "srv_name", srvName, "error", err)return err}watchPlan.Handler = func(lastIndex uint64, result interface{}) {if entries, ok := result.([]*consul.ServiceEntry); ok {var hs []stringfor _, a := range entries {hs = append(hs, fmt.Sprintf("%s:%d", a.Service.Address, a.Service.Port))}if len(hs) > 0 {level.Info(c.logger).Log("msg", "service_node_change_by_healthy_check", "srv_name", srvName, "num", len(hs), "detail", strings.Join(hs, " "))NodeUpdateChan <- hs}}}if err := watchPlan.Run(consulServerAddr); err != nil {level.Error(c.logger).Log("msg", "watchPlan_run_error", "srv_name", srvName, "error", err)return err}return nil}

编写pgw的http接收端

  • web/http.go
  • 使用gin 启动web
  • 添加pushgateway路由
package webimport ("time""net/http""github.com/gin-gonic/gin""dynamic-sharding/pkg/web/controller/pushgateway"
)func StartGin(port string, r *gin.Engine) error {pushgateway.Routes(r)s := &http.Server{Addr:           port,Handler:        r,ReadTimeout:    time.Duration(5) * time.Second,WriteTimeout:   time.Duration(5) * time.Second,MaxHeaderBytes: 1 << 20,}err := s.ListenAndServe()return err}

main中 oklog.run 开启web

	var g run.Group{// Termination handler.term := make(chan os.Signal, 1)signal.Notify(term, os.Interrupt, syscall.SIGTERM)cancel := make(chan struct{})g.Add(func() error {select {case <-term:level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...")cancelAll()return nil//TODO clean work herecase <-cancel:level.Warn(logger).Log("msg", "server finally exit...")return nil}},func(err error) {close(cancel)},)}{// metrics web handler.g.Add(func() error {level.Info(logger).Log("msg", "start web service Listening on address", "address", sc.HttpListenAddr)gin.SetMode(gin.ReleaseMode)routes := gin.Default()errchan := make(chan error, 1)go func() {errchan <- web.StartGin(sc.HttpListenAddr, routes)}()select {case err := <-errchan:level.Error(logger).Log("msg", "Error starting HTTP server", "err", err)return errcase <-ctxAll.Done():level.Info(logger).Log("msg", "Web service Exit..")return nil}}, func(err error) {cancelAll()})}g.Run()

pushgateway的路由

  • web/controller/pushgateway/pgw_route.go
  • 需要处理的是 /metrics/job的get 、put和post方法
package pushgatewayimport ("net/http""github.com/gin-gonic/gin"
)func Routes(r *gin.Engine) {authapi := r.Group("/metrics/job")authapi.GET("/*any", PushMetricsGetHash)authapi.PUT("/*any", PushMetricsRedirect)authapi.POST("/*any", PushMetricsRedirect)tapi := r.Group("/test")tapi.GET("/v1", func(c *gin.Context) {c.String(http.StatusOK, "Hello, I'm pgw gateway+ (。A。)")})
}

推送数据的redirect流程

  • web/controller/pushgateway/pgw_controller.go
  • 获取请求的path
  • 根据path在哈希环上找到要调度的真实pgw node
  • 拼接redirect url,返回给client
  • client再发起请求即可到真实的pgw上
func PushMetricsRedirect(c *gin.Context) {path := c.Request.URL.Pathnode, err := sd.PgwNodeRing.GetNode(path)if err != nil {c.String(http.StatusInternalServerError, "get_node_from_hashring_error")}nextUrl := "http://" + node + pathlog.Printf("[PushMetrics][request_path:%s][redirect_url:%s]", path, nextUrl)//c.Redirect(http.StatusMovedPermanently, nextUrl)c.Redirect(http.StatusTemporaryRedirect, nextUrl)//c.Redirect(http.StatusPermanentRedirect, nextUrl)c.Abort()}
func PushMetricsGetHash(c *gin.Context) {path := c.Request.URL.Pathnode, err := sd.PgwNodeRing.GetNode(path)if err != nil {c.String(http.StatusInternalServerError, "get_node_from_hashring_error")}nextUrl := "http://" + node + pathlog.Printf("[PushMetrics][request_path:%s][redirect_url:%s]", path, nextUrl)c.String(http.StatusOK, "nextUrl:"+nextUrl)}

本节重点总结 :

  • 开启一致性哈希环变更监听处理
    • 这个服务的节点变更了(节点宕机、扩容)就对哈希环进行重置
  • 开启结果监听和watch服务
  • 编写pgw的http接收端
    • 推送数据的redirect流程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/58288.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大语言模型数据处理方法(基于llama模型)

文章目录 前言一、基于huggingface的DataCollatorForSeq2Seq方法解读1、DataCollatorForSeq2Seq方法2、batch最长序列填充3、指定长度填充二、构建大语言模型数据加工模块1、数据读取2、数据加工1、数据格式2、预训练(pretrain)数据加工3、微调(sft)数据加工①、sft数据加工…

Springboot项目搭建的问题

1.第一次出现这个问题是在使用postgresql进行搭建项目的时候&#xff0c;但是配置文件中的驱动一致导入不了 最后发现是meaven中依赖导入不进来&#xff08;不知道为什么&#xff09;&#xff0c;于是手动的在meaven中央仓库下载了对应的jar进行配置 然后项目可以正常启动 2.…

大模型系列——AlphaZero/强化学习/MCTS

AlphaGo Zero无需任何人类历史棋谱&#xff0c;仅使用深度强化学习&#xff0c;从零开始训练三天的成就已远远超过了人类数千年积累的围棋知识。 1、围棋知识 &#xff08;1&#xff09;如何简单理解围棋知识 &#xff08;2&#xff09;数子法分胜负&#xff1a;https://zhu…

w外链如何跳转微信小程序

要创建外链跳转微信小程序&#xff0c;主要有以下几种方法&#xff1a; 使用第三方工具生成跳转链接&#xff1a; 注册并登录第三方外链平台&#xff1a;例如 “W外链” 等工具。前往该平台的官方网站&#xff0c;使用手机号、邮箱等方式进行注册并登录账号。选择创建小程序外…

Jellycat玩偶界的天花板,如何用情绪营销征服成年人的心?

Jellycat的用户肯定对这个品牌有一定的了解&#xff0c;不知道的用户或许也看过这个很火的茄子表情包&#xff0c;这是Jellycat很火的一款玩偶“活泼茄子”。Jellycat&#xff0c;这个源自英国伦敦的高端玩具品牌&#xff0c;近年来在全球范围内迅速走红&#xff0c;摇身一变玩…

一方数据能让沃尔玛广告业务成为下一个亚马逊吗?

作者&#xff1a;刀客doc 在如今的广告市场可以说是热闹非凡。有Facebook、谷歌这些传统巨头&#xff0c;也有亚马逊、TikTok这些行业新贵涌现出来。近几年连PayPal、Netflix这些原来和广告业务八竿子打不着的平台&#xff0c;也都开始把勺子伸进来&#xff0c;想要分一杯羹。…

基于SSM土家风景文化管理系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;景点分类管理&#xff0c;热门景点管理&#xff0c;门票订单管理&#xff0c;旅游线路管理&#xff0c;系统管理 前提账号功能包括&#xff1a;系统首页&#xff0c;个人中心&…

windows 编译 breadpad

原文链接:https://gist.githubusercontent.com/vnl/f317840bfa9c638a60f2c4110872056a/raw/07185c8e86fc2faf08e3410ed3950a5c4d2e8b32/Breakpad%2520on%2520Windows ##### Building Google breakpadBuilding Google breakpad on Windows is a very painful experience because…

搭建海外云服务器需要什么费用?

搭建海外云服务器需要什么费用&#xff1f;搭建海外云服务器的费用涉及多个方面&#xff0c;包括服务器实例费用、公网带宽费用、磁盘存储费用、操作系统费用和其他费用。具体费用取决于所选的云服务提供商、服务器配置、计费模式等因素。以下是UU云小编整理的一些主要的费用构…

Python 基础语法 - 赋值运算符

运算符说明简单赋值运算符、-、*、/、%、//、**等复合赋值运算符 1. 赋值运算符的功能 所有的赋值运算符都是用来给变量赋值的&#xff08;都是完成将数据保存到一个变量中&#xff09;重要结论&#xff1a;所有的赋值运算符表达式都没有结果 -> 无法提供数据 2. 简单赋值…

IDEA->EasyCode(mapper.xml) 字段无逗号分隔和修改全局变量问题

1.mapperxml字段无逗号分隔 在easycode的设置里找到&#xff1a; 1、Template下的 mapper.xml.vm脚本 2、Global Config下的 mybatisSupport.vm脚本 将脚本里的 $velocityHasNext 替换成 $foreach.hasNext&#xff0c;然后保存。Mybatis-Plus框架操作一样 github->issue连…

C# 将时间转换为毫秒

作者&#xff1a;逍遥Sean 简介&#xff1a;一个主修Java的Web网站\游戏服务器后端开发者 主页&#xff1a;https://blog.csdn.net/Ureliable 觉得博主文章不错的话&#xff0c;可以三连支持一下~ 如有疑问和建议&#xff0c;请私信或评论留言&#xff01; C# 将时间转换为毫秒…

PHP的 CSRF、XSS 攻击和防范

CSRF攻击 CSRF&#xff08;Cross-Site Request Forgery&#xff09;攻击&#xff0c;也称为跨站请求伪造&#xff0c;是一种常见的网络安全威胁。在这种攻击中&#xff0c;攻击者利用已认证的用户身份&#xff0c;在用户不知情的情况下伪造请求&#xff0c;冒充用户的操作向目…

Elastic Stack - FileBeat 入门浅体验

Filebeat 是 Elastic Stack 中的一个轻量级日志转发器&#xff0c;主要用于收集和转发日志数据。Filebeat 作为代理安装在您的服务器上&#xff0c;可以监控您指定的日志文件或位置&#xff0c;收集日志事件&#xff0c;并将其转发到 Elasticsearch 或 Logstash 进行索引。 一…

GitHub Actions的 CI/CD

GitHub Actions 是一个强大的 CI/CD 工具&#xff0c;适用于自动化各种开发任务。GitHub Actions 的原理是基于事件驱动的自动化流水线工具&#xff0c;通过定义触发条件和执行步骤&#xff0c;可以让项目在特定条件下自动运行一系列操作&#xff0c;比如构建、测试、部署等。 …

STM32--基于STM32F103C8T6的OV7670摄像头显示

本文介绍基于STM32F103C8T6实现的OV7670摄像头显示设计&#xff08;完整资源及代码见文末链接&#xff09; 一、简介 本文实现的功能&#xff1a;基于STM32F103C8T6实现的OV7670摄像头模组实时在2.2寸TFT彩屏上显示出来 所需硬件&#xff1a; STM32F103C8T6最小系统板、OV76…

基于行业分类的目标检测与跟踪系统

针对题目“目标检测跟踪”&#xff0c;我们可以根据行业类别、子类别、细分类别以及应用场景选择合适的图表进行可视化分析。以下是一些可能的图表选择及其对应的SQL示例&#xff08;假设有一个数据库包含相关字段&#xff09;&#xff1a; 1. 散点图 (Scatter Plot) 应用场景…

C#与C++交互开发系列(十一):委托和函数指针传递

前言 在C#与C的互操作中&#xff0c;委托&#xff08;delegate&#xff09;和函数指针的传递是一个复杂但非常强大的功能。这可以实现从C回调C#方法&#xff0c;或者在C#中调用C函数指针的能力。无论是跨语言调用回调函数&#xff0c;还是在多线程、异步任务中使用委托&#x…

在Ubuntu上配置python环境

apt install python3.11-venv 是一个命令&#xff0c;用于在基于 Debian 的 Linux 系统&#xff08;如 Ubuntu&#xff09;上安装 Python 3.11 的虚拟环境模块。 解释&#xff1a; apt: 这是一个包管理工具&#xff0c;用于安装、更新、删除软件包。install: 这是一个命令&am…

CloudStack云平台搭建:XenServer服务器系统安装

1.打开VMware虚拟机&#xff0c;点击“创建新的虚拟机” 2. 点击“自定义&#xff08;高级&#xff09;” → “下一步” 3. 点击“下一步” 4. 点击“稍后安装操作系统” → “下一步” 5. 选择“其他” → “其他64位” → “下一步” 6. 修改“虚拟机名称” 、“位置”&…