从零开始实现一个RPC框架(三)

前言

到目前为止我们的框架已经有了一部分服务治理的功能,这次我们在之前的基础上实现一些其他功能。篇幅所限这里只列举部分实现

zookeeper注册中心

实现我们之前的注册中心的接口即可,这里使用了docker的libkv而不是直接用zk客户端(从rpcx那学的),libkv封装了对于几种存储服务的操作,包括Consul、Etcd、Zookeeper和BoltDB,后续如果要支持其他类型的存储就得自己写客户端了。基于zk的注册中心的定义如下:

type ZookeeperRegistry struct {AppKey         string //一个ZookeeperRegistry实例和一个appkey关联ServicePath    string //数据存储的基本路径位置,比如/service/providersUpdateInterval time.Duration //定时拉取数据的时间间隔kv store.Store //封装过的zk客户端providersMu sync.RWMutexproviders   []registry.Provider //本地缓存的列表watchersMu sync.Mutexwatchers   []*Watcher //watcher列表
}

初始化部分逻辑如下:

func NewZookeeperRegistry(AppKey string, ServicePath string, zkAddrs []string,updateInterval time.Duration, cfg *store.Config) registry.Registry {zk := new(ZookeeperRegistry)zk.AppKey = AppKeyzk.ServicePath = ServicePathzk.UpdateInterval = updateIntervalkv, err := libkv.NewStore(store.ZK, zkAddrs, cfg)if err != nil {log.Fatalf("cannot create zk registry: %v", err)}zk.kv = kvbasePath := zk.ServicePathif basePath[0] == '/' { //路径不能以"/"开头basePath = basePath[1:]zk.ServicePath = basePath}//先创建基本路径err = zk.kv.Put(basePath, []byte("base path"), &store.WriteOptions{IsDir: true})if err != nil {log.Fatalf("cannot create zk path %s: %v", zk.ServicePath, err)}//显式拉取第一次数据zk.doGetServiceList()go func() {t := time.NewTicker(updateInterval)for range t.C {//定时拉取数据zk.doGetServiceList()}}()go func() {//后台watch数据zk.watch()}()return zk
}

我们在初始化注册中心时执行两个后台任务:定时拉取和监听数据,相当于推拉结合的方式。同时监听获得的数据是全量数据,因为实现起来简单一些,后续如果服务列表越来越大时,可能需要加上基于版本号的机制或者只传输增量数据。这里额外指出几个要点:

  1. 后台定时拉取数据并缓存起来
  2. 查询时直接返回缓存
  3. 注册时在zk添加节点,注销时在zk删除节点
  4. 监听时并不监听每个服务提供者,而是监听其父级目录,有变更时再统一拉取服务提供者列表,这样可以减少watcher的数目,逻辑也更简单一些
  5. 因为第4点,所以注册和注销时需要更改父级目录的内容(lastUpdate)来触发监听

具体的注册注销逻辑这里不再列举,参考:github

客户端心跳

如果我们使用zk作为注册中心,更简单的做法可能是直接将服务提供者作为临时节点添加到zk上,这样就可以利用临时节点的特性实现动态的服务发现。但是我们使用的libkv库并不支持临时节点的功能,而且除了zk其他存储服务比如etcd等可能也不支持临时节点的特性,所以我们注册到注册中心的都是持久节点。在这种情况下,可能某些由于特殊情况无法访问的服务提供者并没有及时地将自身从注册中心注销掉,所以客户端需要额外的能力来判断一个服务提供者是否可用,而不是完全依赖注册中心。
所以我们需要增加客户端心跳的支持,客户端可以定时向服务端发送心跳请求,服务端收到心跳请求时可以直接返回,只要通知客户端自身仍然可用就行。客户端可以根据设置的阈值,对心跳失败的服务提供者进行降级处理,直到心跳恢复或者服务提供者被注销掉。客户端发送心跳逻辑如下:

func (c *sgClient) heartbeat() {if c.option.HeartbeatInterval <= 0 {return}//根据指定的时间间隔发送心跳t := time.NewTicker(c.option.HeartbeatInterval)for range t.C {if c.shutdown {t.Stop()return}//遍历每个RPCClient进行心跳检查c.clients.Range(func(k, v interface{}) bool {err := v.(RPCClient).Call(context.Background(), "", "", nil)c.mu.Lock()if err != nil {//心跳失败进行计数if fail, ok := c.clientsHeartbeatFail[k.(string)]; ok {fail++c.clientsHeartbeatFail[k.(string)] = fail} else {c.clientsHeartbeatFail[k.(string)] = 1}} else {//心跳成功则进行恢复c.clientsHeartbeatFail[k.(string)] = 0c.serversMu.Lock()for i, p := range c.servers {if p.ProviderKey == k {delete(c.servers[i].Meta, protocol.ProviderDegradeKey)}}c.serversMu.Unlock()}c.mu.Unlock()//心跳失败次数超过阈值则进行降级if c.clientsHeartbeatFail[k.(string)] > c.option.HeartbeatDegradeThreshold {c.serversMu.Lock()for i, p := range c.servers {if p.ProviderKey == k {c.servers[i].Meta[protocol.ProviderDegradeKey] = true}}c.serversMu.Unlock()}return true})}
}

鉴权

鉴权的实现比较简单,客户端可以在元数据中携带鉴权相关的信息,而服务端可以通过指定的Wrapper进行鉴权。服务端Wrapper的代码如下:

type AuthFunc func(key string) bool
type ServerAuthInterceptor struct {authFunc AuthFunc
}
func NewAuthInterceptor(authFunc AuthFunc) Wrapper {return &ServerAuthInterceptor{authFunc}
}
func (sai *ServerAuthInterceptor) WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc {return func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport) {if auth, ok := ctx.Value(protocol.AuthKey).(string); ok {//鉴权通过则执行业务逻辑if sai.authFunc(auth) {requestFunc(ctx, response, response, tr)return}}//鉴权失败则返回异常s.writeErrorResponse(response, tr, "auth failed")}
}

熔断降级

暂时实现了简单的基于时间窗口的熔断器,实现如下:

type CircuitBreaker interface {AllowRequest() boolSuccess()Fail(err error)
}
type DefaultCircuitBreaker struct {lastFail  time.Timefails     uint64threshold uint64window    time.Duration
}
func NewDefaultCircuitBreaker(threshold uint64, window time.Duration) *DefaultCircuitBreaker {return &DefaultCircuitBreaker{threshold: threshold,window:    window,}
}
func (cb *DefaultCircuitBreaker) AllowRequest() bool {if time.Since(cb.lastFail) > cb.window {cb.reset()return true}failures := atomic.LoadUint64(&cb.fails)return failures < cb.threshold
}
func (cb *DefaultCircuitBreaker) Success() {cb.reset()
}
func (cb *DefaultCircuitBreaker) Fail() {atomic.AddUint64(&cb.fails, 1)cb.lastFail = time.Now()
}
func (cb *DefaultCircuitBreaker) reset() {atomic.StoreUint64(&cb.fails, 0)cb.lastFail = time.Now()
}

结语

这次的内容就到此为止,有任何意见或者建议欢迎指正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/797313.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【鸿蒙 HarmonyOS】获取设备的地理位置

一、背景 获取移动设备的地理位置&#xff0c;包含&#xff1a;经度、维度、具体地理位置等&#xff0c;地理位置信息能在许多业务场景中被应用&#xff0c;如导航、地图服务、位置服务、社交媒体等。 下面以一个Demo例子&#xff0c;来实现获取设备地理位置的功能 官方文档…

【MySQL】跟着示例学 SQL

下面所有示例均来自 SQL之母 - SQL自学网站 -- 查询所有学生 -- 请编写 SQL 查询语句&#xff0c;从名为 student 的数据表中查询出所有学生的信息。 select * from student; -- 查询学生的姓名和年龄 -- 请编写一条 SQL 查询语句&#xff0c;从名为 student 的数据表中选择出…

Java入门基础知识第五课(超基础,超仔细)——选择结构

今天主要讲一下if选择结构以及如何利用Math.random()来获取随机数。 流程控制&#xff1a;用来控制代码的执行顺序 顺序结构&#xff1a;代码从上往下按照顺序依次执行 选择结构&#xff1a;根据条件选择性的执行某部分代码 循环结构&#xff1a;反复执行一段代码 一、i…

软件测试下的AI之路(4)

&#x1f60f;作者简介&#xff1a;博主是一位测试管理者&#xff0c;同时也是一名对外企业兼职讲师。 &#x1f4e1;主页地址&#xff1a;【Austin_zhai】 &#x1f646;目的与景愿&#xff1a;旨在于能帮助更多的测试行业人员提升软硬技能&#xff0c;分享行业相关最新信息。…

实时计算平台设计方案:913-基于100G光口的DSP+FPGA实时计算平台

基于100G光口的DSPFPGA实时计算平台 一、产品概述 基于以太网接口的实时数据智能计算一直应用于互联网、网络安全、大数据交换的场景。以DSPFPGA的方案&#xff0c;体现了基于硬件计算的独特性能&#xff0c;区别于X86GPU的计算方案&#xff0c;保留了高带宽特性&…

Ceph学习 - 1.存储知识

文章目录 1.存储基础1.1 基础知识1.1.1 存储基础1.1.2 存储使用 1.2 文件系统1.2.1 简介1.2.2 数据存储1.2.3 存储应用的基本方式1.2.4 文件存储 1.3 小结 1.存储基础 学习目标&#xff1a;这一节&#xff0c;我们从基础知识、文件系统、小节三个方面来学习。 1.1 基础知识 1.…

UART设计

一、UART通信简介 通用异步收发器&#xff0c; 特点&#xff1a;串行、异步、全双工通信 优点&#xff1a;通信线路简单&#xff0c;传输距离远 缺点&#xff1a;传输速度慢 数据传输速率&#xff1a;波特率&#xff08;单位&#xff1a;baud&#xff0c;波特&#xff09; …

如何高效学习Python编程语言

理解Python的应用场景 不同的编程语言有不同的发展历史和应用场景,了解Python主要应用在哪些领域对于学习它会有很大帮助。Python最初是一种通用脚本语言,主要用于系统级任务自动化。随着时间的推移,它逐步成为数据处理、科学计算、Web开发、自动化运维等众多领域的主要编程语…

Navicat设置mysql权限

新建用户&#xff1a; 注意&#xff1a;如果不生效执行刷新命令:FLUSH PRIVILEGES; 执行后再重新打开查看&#xff1b; 查询权限命令&#xff1a;1234为新建的用户名&#xff0c;localhost为访问的地址 SHOW GRANTS FOR 1234localhost;如果服务器设置服务器权限后可能会出现权…

【算法】求一个数组中三个数乘积最大值 - 线性扫描

题目 给定一个数组&#xff0c;找出数组中乘积最大的三个数。 原理 一个数组中最大值只有两种情况&#xff1a;两个最小的负数和一个最大的正数 & 三个最大的正数。线性扫描找出这五个数字&#xff0c;即可求出最大值。 代码 public static void main(String[] args) {…

潜伏三年,核弹级危机一触即发,亚信安全深度分析XZ Utils后门事件

2024年3月29日星期五上午8点&#xff0c;有研究人员称xz/liblzma中的后门导致SSH服务器内存泄露&#xff0c;使得SSH服务异常&#xff08;https://www.openwall.com/lists/oss-security/2024/03/29/4&#xff09;。github中“xz”压缩工具主要由Larhzu和Jia Tan共同负责维护&am…

QToolTip设置背景色没有生效原因与解决方法

设置全局QToolTip的背景色&#xff0c;有两种方法&#xff1a; 1. 样式表设置&#xff1b; 2. QToolTip::setPalette(pal);调色板设置&#xff1b; QPalette pal QToolTip::palette(); //修改背景色 pal.setColor(QPalette::Inactive,QPalette::ToolTipBase,QColor(240, 25…

力扣25. K 个一组翻转链表

Problem: 25. K 个一组翻转链表 文章目录 题目描述思路复杂度Code 题目描述 思路 1.创建虚拟头节点dummy并将其next指针指向head&#xff0c;创建指针pre、end均指向dummy&#xff1b; 2.编写反转单链表的函数reverse 3.当end -> next 不为空时&#xff1a; 3.1.每次k个一组…

Java强连通分量知识点(含面试大厂题和源码)

在大厂面试中&#xff0c;与拓扑排序相关的问题通常涉及到对图的处理和算法设计。以下是三道可能出现在大厂面试中的编程题目&#xff0c;以及相应的Java源码实现。 题目 1&#xff1a;课程依赖关系 描述&#xff1a; 给定一个课程列表和课程之间的依赖关系&#xff0c;为所有…

Bigtable [OSDI‘06] 论文阅读笔记

原论文&#xff1a;Bigtable: A Distributed Storage System for Structured Data (OSDI’06) 1. Introduction Bigtable 是一种用于管理结构化数据的分布式存储系统&#xff0c;可扩展到非常大的规模&#xff1a;数千台服务器上的数据量可达 PB 级别&#xff0c;同时保证可靠…

学习java第三十五

Spring事务传播行为&#xff1a; PROPAGATION_REQUIRED(默认) 如果当前没有事务&#xff0c;就新建一个事务&#xff0c;如果已经存在一个事务中&#xff0c;加入到这个事务中 PROPAGATION_SUPPORTS 支持当前事务&#xff0c;如果当前没有事务&#xff0c;就以非事务方式执行 P…

golang语言和JAVA对比

引言: 在当今的软件开发领域,有许多编程语言供开发人员选择。其中,Golang和Java是两种备受开发者青睐的语言。本文将探讨Golang和Java之间的比较和对比,分析它们在语言特性、性能、平台支持、社区和生态系统、开发效率和可维护性等方面的异同。 一、语言特性和性能 Golang…

二维相位解包理论算法和软件【全文翻译-路径跟踪方法(4.1)】

4.1 引言 在第 2 章中我们注意到,从一个像素点开始计算的解包相位可能取决于积分路径。如果我们沿着两条不同的路径从一个像素点到另一个像素点,我们可能会得到两个不同的解包裹相位答案。我们发现,这些不一致是由称为残差的点状结构造成的。残差位于由四个像素组成的 &quo…

加密算法(二)

1、SHA-256加密算法&#xff1a; package com.arithmetic.encryption; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; //使用java.security.MessageDigest类来进行SHA-256摘要的计算。 //通过getInstance("SHA-256")方法获取…

阿里巴巴拍立淘API新功能揭秘:图片秒搜商品,实现智能化个性化购物新体验

在数字化快速发展的今天&#xff0c;智能化和个性化已经成为购物体验中不可或缺的元素。为了满足消费者日益增长的购物需求&#xff0c;阿里巴巴中国站不断推陈出新&#xff0c;其中拍立淘API的新功能——图片秒搜商品&#xff0c;无疑为智能化个性化购物体验开创了新的篇章。 …