RPC教程 6.负载均衡

1.负载均衡策略

假设有多个服务实例,而每个实例都提供相同的功能,为了提高整个系统的吞吐量,每个实例部署在不同的机器上。客户端可以选择任意一个实例进行调用,获取想要的结果。那如何选择呢?取决于负载均衡的策略。

  • 随机选择策略 - 从服务列表中随机选择一个。
  • 轮询算法(Round Robin) - 依次调度不同的服务器,每次调度执行 i = (i + 1) mode n。
  • 加权轮询(Weight Round Robin) - 在轮询算法的基础上,为每个服务实例设置一个权重,高性能的机器赋予更高的权重,也可以根据服务实例的当前的负载情况做动态的调整,例如考虑最近3分钟部署服务器的 CPU、内存消耗情况。

2.服务发现 

需要负载均衡那就需要有多个提供相同功能的服务实例。那服务发现是什么意思呢?现在我们有多个提供相同功能的服务实例,那客户端要获取该服务的地址,就需要服务中心返回一个地址给客戶端。这个就是服务发现。

那我们先实现一个基础的服务发现模块 Discovery(定义成interface接口类型)。

我们定义两个类型:

  • SelectMode 代表不同的负载均衡策略,简单起见,这里仅实现 Random 和 RoundRobin 两种策略。
  • Discovery 是一个接口类型,包含了服务发现所需要的最基本的接口。
    • Refresh() 从注册中心更新服务列表
    • Update(servers []string) 手动更新服务列表
    • Get(mode SelectMode) 根据负载均衡策略,选择一个服务实例
    • GetAll() 返回所有的服务实例

 代码在xclient文件夹中。

//discovery.go
type SelectMode intconst (RandomSelect SelectMode = iotaRoundRobinSelect
)type Discovery interface {Refresh() errorUpdate(servers []string) errorGet(mode SelectMode) (string, error)GetAll() ([]string, error)
}

 紧接着,我们实现一个不需要注册中心,服务列表由手工维护的服务发现的结构体:MultiServersDiscovery。

可以用编码的方式在客户端中配置服务的地址,服务器不需要进行更多的配置,如果添加或删除了某些服务,可以调用MultipleServersDiscovery.Update来动态更新服务。

客户端使用NewMultipleServersDiscovery方法设置该服务的网络和地址。

//discovery.go
type MultiServerDiscovery struct {rwMutex sync.RWMutex //protect following,即是保护servers,indexservers []stringindex   int
}//使用例子: NewMultiServerDiscovery([]string{"tcp@127.0.0.1:100","tcp@127.0.0.1:2100"})
func NewMultiServerDiscovery(servers []string) *MultiServerDiscovery {d := &MultiServerDiscovery{servers: servers,}d.index = rand.Intn(math.MaxInt32 - 1)return d
}
  • index 记录 Round Robin 算法已经轮询到的位置,为了避免每次从 0 开始,初始化时随机设定一个值。

 然后,实现 Discovery 接口

var _ Discovery = (*MultiServerDiscovery)(nil)func (d *MultiServerDiscovery) Refresh() error {return nil
}func (d *MultiServerDiscovery) Update(servers []string) error {d.rwMutex.Lock()defer d.rwMutex.Unlock()d.servers = serversreturn nil
}func (d *MultiServerDiscovery) Get(mode SelectMode) (string, error) {//这里不能用d.rwMutex.RLock(),因为d.index有更新d.rwMutex.Lock()defer d.rwMutex.Unlock()n := len(d.servers)if n == 0 {return "", errors.New("rpc discovery: no available servers")}switch mode {case RandomSelect:return d.servers[rand.Intn(n)], nilcase RoundRobinSelect:s := d.servers[d.index%n]d.index = (d.index + 1) % nreturn s, nildefault:return "", errors.New("rpc discovery: not supported select mode")}
}func (d *MultiServerDiscovery) GetAll() ([]string, error) {d.rwMutex.RLock()defer d.rwMutex.RUnlock()// return a copy of d.serversservers := make([]string, len(d.servers))copy(servers, d.servers)return servers, nil
}

3.为什么选择客户端负载均衡

RPC client 和 server 建立是长连接, 因而基于连接的负载均衡没有太大意义, 所以 该RPC 负载均衡是基于每次调用。也就是客户在同一个 client 发的请求希望它被负载均衡到所有服务端。

一般来说负载均衡器是独立的, 被放置在服务消费者和提供者之间. 代理通常需要保存请求响应副本, 因此有性能消耗也会造成额外延迟. 当请求量大时, lb (load balance)可能会变成瓶颈, 并且此时 lb 单点故障会影响整个服务。

客户端负载将lb 的功能集成到客户端进程里,然后使用负载均衡策略选择一个目标服务地址,向目标服务发起请求。LB能力被分散到每一个服务消费者的进程内部,同时服务消费方和服务提供方之间是直接调用,没有额外开销,性能比较好。

但用客户端负载均衡也有坏处, 若有多种不同的语言栈,就要配合开发多种不同的客户端,有一定的研发和维护成本。

4.支持负载均衡的客户端

之前对外使用的客户端是Dail(...),这里我们也要向用户暴露一个支持负载均衡的客户端,叫做 XClient。

//xclient.go
type XClient struct {d       Discoverymode    SelectModeopt     *geerpc.Optionmutex   sync.Mutexclients map[string]*geerpc.Client
}func NewXClient(d Discovery, mode SelectMode, opt *geerpc.Option) *XClient {return &XClient{d:       d,mode:    mode,opt:     opt,clients: make(map[string]*geerpc.Client),}
}func (xc *XClient) Close() error {xc.mutex.Lock()defer xc.mutex.Unlock()for key, client := range xc.clients {//只是关闭,没有其他的对错误的处理client.Close()delete(xc.clients, key)}return nil
}

XClient 的构造函数需要传入三个参数,服务发现实例 Discovery、负载均衡模式 SelectMode 以及协议选项 Option。为了尽量地复用已经创建好的 Socket 连接,使用 clients 保存创建成功的 Client 实例,并提供 Close 方法在结束后,关闭已经建立的连接。

我们之前是使用dial函数来创建一个客户端,那我们为了可以复用已经创建好的socket连接,这里我们也实现一个dial函数,在内部复用socket连接。

func (xc *XClient) dial(rpcAddr string) (*geerpc.Client, error) {xc.mutex.Lock()defer xc.mutex.Unlock()client, ok := xc.clients[rpcAddr]if ok && !client.IsAvailable() {client.Close()delete(xc.clients, rpcAddr)client = nil}if client == nil {var err errorclient, err = geerpc.XDial(rpcAddr, xc.opt)if err != nil {return nil, err}xc.clients[rpcAddr] = client}return client, nil
}func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod string, args, reply interface{}) error {//获取sokcet连接(复用)client, err := xc.dial(rpcAddr)if err != nil {return err}return client.Call(ctx, serviceMethod, args, reply)
}// serviceMethod 例子:"Foo.SUM"
func (xc *XClient) Call(ctx context.Context, serviceMethod string, args, reply any) error {//通过负载均衡策略得到服务实例rpcAddr, err := xc.d.Get(xc.mode)if err != nil {return err}return xc.call(rpcAddr, ctx, serviceMethod, args, reply)
}

之后实现一个调用服务的方法Call。该方法主要是三步:

  1. 通过负载均衡策略得到服务实例
  2. 获取sokcet连接(复用)
  3. 最终调用client.Call去发送服务请求

另外,我们为 XClient 添加一个常用功能:Broadcast。

Broadcast 表示向所有服务器发送请求,只有所有服务器正确返回时才会成功。

Broadcast 是 XClient 的一个方法, 你可以将一个请求发送到这个服务的所有节点。
如果所有的节点都正常返回,没有错误的话, Broadcast将返回其中的一个节点的返回结果。 如果有节点返回错误的话,将返回这些错误信息中的一个。

func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply any) error {//获取所有的服务实例servers, err := xc.d.GetAll()if err != nil {return err}var wg sync.WaitGroupvar mutex sync.Mutex //protect e and replyDonevar e errorreplyDone := reply == nilctx, cancel := context.WithCancel(ctx)defer cancel()for _, rpcAddr := range servers {wg.Add(1)//fmt.Printf("rpcAddrstring addr: %p\n", &rpcAddr) //其rpcAddr的地址都是一样的go func(rpcAddr string) {defer wg.Done()var clonedReply anyif reply != nil {//reply是指针的,所以需要使用Elem()clonedReply = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()}//xc.call方法中的参数clonedReply不能使用replyerr := xc.call(rpcAddr, ctx, serviceMethod, args, clonedReply)mutex.Lock()defer mutex.Unlock()if err != nil && e == nil {//e==nil表明e还没有被赋值e = errcancel() // if any call failed, cancel unfinished calls}if err == nil && !replyDone {reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem())replyDone = true}}(rpcAddr)}wg.Wait()return e
}// //另一种写法,go协程中没有参数
// func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply any) error {
//     .............
// 	for _, rpcAddr := range servers {
// 		wg.Add(1)
// 		//fmt.Printf("rpcAddrstring addr: %p\n", &rpcAddr) //其rpcAddr的地址都是一样的
//         addr:=rpcAddr
// 		go func() {
// 			defer wg.Done()
// 		err := xc.call(addr, ctx, serviceMethod, args, clonedReply)
//          ......
// 		}()
// 	}
//     ................
// }

 需要注意的几点:

  1. 为了提升性能,请求是并发的。而要等待协程去访问服务实例结束,所以可以使用sync.WaitGroup来阻塞等待。
  2. 并发情况下,xc.call中不能使用reply入参,需要每个协程都有自己的clonedReply参数,不然就需要用锁来控制reply,这就不值得了。每个协程都有自己的clonedReply,获得结果后,再把clonedReply赋值给reply就行,这样就只需要使用互斥锁保证 error 和 reply 能被正确赋值即可。
  3. 借助 context.WithCancel 确保有错误发生时,快速失败。

5.测试 

首先,启动 RPC 服务的代码还是类似的,Sum 是正常的方法,Sleep 用于验证 XClient 的超时机制能否正常运作。

type My inttype Args struct{ Num1, Num2 int }func (m *My) Sum(args Args, reply *int) error {*reply = args.Num1 + args.Num2return nil
}func (m *My) Sleep(args Args, reply *int) error {time.Sleep(time.Second * time.Duration(args.Num1))*reply = args.Num1 + args.Num2return nil
}

接着,有两个函数,clientCall调用单个服务实例,broadcast 调用所有服务实例。

这两个函数代码也是基本相似的,主要不同就是协程函数内的操作不同。

// 调用单个服务实例
func clientCall(addr1, addr2 string) {d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()var reply int = 1324if err := xc.Call(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {log.Println("call Foo.Sum error:", err)}fmt.Println("reply: ", reply)}(i)}wg.Wait()
}func broadcast(addr1, addr2 string) {d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()var reply int = 1324if err := xc.Broadcast(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {fmt.Println("Broadcast call Foo.Sum error:", err)}fmt.Println("Broadcast reply: ", reply)ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)defer cancel()var replyTimeout int = 1324if err := xc.Broadcast(ctx, "My.Sleep", &Args{Num1: i, Num2: i * i}, &replyTimeout); err != nil {fmt.Println("Broadcast call Foo.Sum error:", err)}fmt.Println("timeout Broadcast reply: ", replyTimeout)}(i)}wg.Wait()
}func main() {ch1 := make(chan string)ch2 := make(chan string)//start two serversgo startServer(ch1)go startServer(ch2)addr1 := <-ch1addr2 := <-ch2time.Sleep(time.Second)clientCall(addr1, addr2)broadcast(addr1, addr2)
}

效果

 

完整代码:https://github.com/liwook/Go-projects/tree/main/geerpc/6-load-balance

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/651776.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Visual Studio如何修改成英文版

1、打开 Visual Studio Installer 2、点击修改 3、找到语言包&#xff0c;选择需要的语言包&#xff0c;而后点击修改 4、等待下载 5、 安装完成后启动Visual Studio 6、在工具-->选项-->环境-->区域设置-->English并确定 7、重启 Visual Studio&#xff0c;配置…

数据结构与算法教程,数据结构C语言版教程!(第六部分、数据结构树,树存储结构详解)三

第六部分、数据结构树&#xff0c;树存储结构详解 数据结构的树存储结构&#xff0c;常用于存储逻辑关系为 "一对多" 的数据。 树存储结构中&#xff0c;最常用的还是二叉树&#xff0c;本章就二叉树的存储结构、二叉树的前序、中序、后序以及层次遍历、线索二叉树、…

Python 命令行工具开发入门

在实际应用中,我们常常需要编写一些命令行工具,以便在终端或脚本中执行特定的任务。本文将介绍如何使用 Python 编写一个简单的命令行工具,并展示一些常见的实用技巧。 1. 概述 我们的命令行工具将具备以下功能: 输出文件内容到标准输出显示 Python 版本号显示帮助信息2.…

QT实现USB摄像头接入显示

一、UVC协议简介 UVC全称是USB Video Class&#xff08;USB视频类&#xff09;&#xff0c;是一种标准化的USB视频设备通信协议&#xff0c;它定义了摄像头与主机之间的数据传输协议和格式。 UVC协议的出现&#xff0c;解决了摄像头厂商之间互不兼容&#xff0c;以及摄像头应…

(二十八)ATP应用测试平台——使用electron集成vue3桌面应用程序

前言 Electron 是一个开源的框架&#xff0c;它允许使用 Web 技术&#xff08;HTML、CSS 和 JavaScript&#xff09;构建跨平台的桌面应用程序。通过 Electron&#xff0c;开发者可以使用前端技术栈来创建具有原生应用程序体验的桌面应用。 Electron可以在 Windows、Mac 和 L…

Nacos源码解析:String.intern()方法的巧妙应用

引言&#xff1a; 在阅读Nacos源码时&#xff0c;发现其中使用了String.intern()方法&#xff0c;这个使用并不是简单的拼接字符串&#xff0c;而是在特定场景下的优化手段。本文将深入探讨Nacos源码中String.intern()方法的应用&#xff0c;以及为什么要使用这个方法。 1. N…

【前端web入门第一天】02 HTML图片标签 超链接标签 音频标签 视频标签

文章目录: 1.HTML图片标签 1.1 图像标签-基本使用1.2 图像标签-属性1.3 路径 1.3.1 相对路径 1.3.2 绝对路径 2.超链接标签 3.音频标签 4.视频标签 1.HTML图片标签 1.1 图像标签-基本使用 作用:在网页中插入图片。 <img src"图片的URL">src用于指定图像…

《Python 简易速速上手小册》第8章:Python 网络编程与 Web 开发(基于最新版 Python3.12 编写)

注意&#xff1a;本《Python 简易速速上手小册》 核心目的在于让零基础新手「快速构建 Python 知识体系」 文章目录 <mark >注意&#xff1a;本《Python 简易速速上手小册》<mark >核心目的在于让零基础新手「快速构建 Python 知识体系」 8.1 Python 中的网络通信…

HCS 华为云Stack产品组件

HCS 华为云Stack产品组件 Cloud Provisioning Service(CPS) 负责laas的云平台层的部署和升级是laas层中真正面向硬件设备&#xff0c;并将其池化软件化的部件。 Service OM 资源池(计算/存储/网络)以及基础云服务(ECS/EVS/PC)的管理工具。 ManageOne ManageOne包括服务中心…

数据结构(1)--> 顺序表

定义&#xff1a; 顺序表存储定义&#xff1a; 把逻辑上相邻的数据元素存储在物理上相邻的存储单元中的存储结构&#xff0c;顺序表功能的实现借助于数组&#xff0c;通过对数组进行封装&#xff0c;从而实现增删查改的功能&#xff0c;严格意义上来说&#xff08;数组无法实现…

如何发布自己的npm包,详细流程

发布自己的npm包需要遵循以下具体流程&#xff1a; 创建npm账号&#xff1a;打开浏览器&#xff0c;访问npm官网&#xff0c;注册一个npm账号。 创建项目文件夹并进入&#xff1a;在本地创建一个项目文件夹&#xff0c;并使用终端进入该文件夹。 初始化包信息管理文件&#x…

第六课:Prompt

文章目录 第六课&#xff1a;Prompt1、学习总结&#xff1a;Prompt介绍预训练和微调模型回顾挑战 Pre-train, Prompt, PredictPrompting是什么?prompting流程prompt设计 课程ppt及代码地址 2、学习心得&#xff1a;3、经验分享&#xff1a;4、课程反馈&#xff1a;5、使用Mind…

由两个有限项的等差数列B, C, 求有多少个有限项的等差数列A,满足C是A, B的所有公共项,若有无穷个A满足条件,输出-1

题目 思路&#xff1a; #include <bits/stdc.h> using namespace std; #define int long long #define pb push_back const int maxn 1e6 5, inf 1e9 5, maxm 4e4 5, mod 1e9 7, N 1e6; // int a[maxn], b[maxn]; int n, m; string s;int qpow(int a, int b){i…

Unity中创建Ultraleap 3Di交互项目

首先&#xff0c;创建新的场景 1、创建一个空物体&#xff0c;重命名为【XP Leap Provider Manager】&#xff0c;并在这个空物体上添加【XR Leap Provider Manager】 在物体XP Leap Provider Manager下&#xff0c;创建两个子物体Service Provider(XR)和Service Provider(…

C语言与操作符相关的经典例题

目录 一道变态的面试题&#xff1a;不能创建临时变量&#xff08;第三个变量&#xff09;&#xff0c;实现两个数的交换。 编写代码实现&#xff1a;求一个整数存储在内存中的二进制中1的个数。 二进制位置0或者置1 如果以下的知识点不是很清楚的可以去看这篇文章&#xff1…

Deepin基本环境查看 - 目录/大纲

第一次整理本文材料才发现 原来写博客和写代码一样的 多章节的内容&#xff0c;必须将目录单独取出来 这样才方便作者&#xff0c;也方便读者 奇怪的知识又增加了 ^^ Deepin基本环境查看 - 目录Deepin基本环境查看&#xff08;一&#xff09;【基本信息】Deepin基本环境查看&am…

阿里云部署配置幻兽帕鲁Palworld联机服务器详细教程

阿里云作为国内领先的云计算服务提供商&#xff0c;为企业和个人提供了丰富的云服务。本文将为大家详细介绍如何在阿里云上配置幻兽帕鲁Palworld联机服务器&#xff0c;以便与更多玩家共同体验游戏的乐趣。 第一步&#xff1a;登录服务器创建页 1、进入幻兽帕鲁联机服务快速部…

数据结构——顺序队列(循环)

采用顺序表的方式实现循环队列。其中关键在于如何判断队列已满。通常情况下&#xff0c;当对头和队尾指向同一个节点时&#xff0c;可以判断为队空。但是&#xff0c;倘若队尾不断增加&#xff0c;最后队尾也会指向对头&#xff0c;此时队满和队空的判断条件一致。以下有三种对…

剖析线程池ForkJoinPool

文章目录 一、引言二、ForkJoinPool概述三、工作原理四、案例及分析案例背景案例分析实现 五、注意事项六、总结 一、引言 在并发编程中&#xff0c;线程池是一个常见的工具&#xff0c;用于管理和复用线程&#xff0c;以避免频繁地创建和销毁线程带来的开销。ForkJoinPool是J…

11. 双目视觉之立体视觉基础

目录 1. 深度恢复1.1 单目相机缺少深度信息1.2 如何恢复场景深度&#xff1f;1.3 深度恢复的思路 2. 对极几何约束2.1 直观感受2.2 数学上的描述 1. 深度恢复 1.1 单目相机缺少深度信息 之前学习过相机模型&#xff0c;最经典的就是小孔成像模型。我们知道相机通过小孔成像模…