1.负载均衡策略
假设有多个服务实例,而每个实例都提供相同的功能,为了提高整个系统的吞吐量,每个实例部署在不同的机器上。客户端可以选择任意一个实例进行调用,获取想要的结果。那如何选择呢?取决于负载均衡的策略。
- 随机选择策略 - 从服务列表中随机选择一个。
- 轮询算法(Round Robin) - 依次调度不同的服务器,每次调度执行 i = (i + 1) mode n。
- 加权轮询(Weight Round Robin) - 在轮询算法的基础上,为每个服务实例设置一个权重,高性能的机器赋予更高的权重,也可以根据服务实例的当前的负载情况做动态的调整,例如考虑最近3分钟部署服务器的 CPU、内存消耗情况。
2.服务发现
需要负载均衡那就需要有多个提供相同功能的服务实例。那服务发现是什么意思呢?现在我们有多个提供相同功能的服务实例,那客户端要获取该服务的地址,就需要服务中心返回一个地址给客戶端。这个就是服务发现。
那我们先实现一个基础的服务发现模块 Discovery(定义成interface接口类型)。
我们定义两个类型:
- SelectMode 代表不同的负载均衡策略,简单起见,这里仅实现 Random 和 RoundRobin 两种策略。
- Discovery 是一个接口类型,包含了服务发现所需要的最基本的接口。
- Refresh() 从注册中心更新服务列表
- Update(servers []string) 手动更新服务列表
- Get(mode SelectMode) 根据负载均衡策略,选择一个服务实例
- GetAll() 返回所有的服务实例
代码在xclient文件夹中。
//discovery.go
type SelectMode intconst (RandomSelect SelectMode = iotaRoundRobinSelect
)type Discovery interface {Refresh() errorUpdate(servers []string) errorGet(mode SelectMode) (string, error)GetAll() ([]string, error)
}
紧接着,我们实现一个不需要注册中心,服务列表由手工维护的服务发现的结构体:MultiServersDiscovery。
可以用编码的方式在客户端中配置服务的地址,服务器不需要进行更多的配置,如果添加或删除了某些服务,可以调用MultipleServersDiscovery.Update
来动态更新服务。
客户端使用NewMultipleServersDiscovery方法
设置该服务的网络和地址。
//discovery.go
type MultiServerDiscovery struct {rwMutex sync.RWMutex //protect following,即是保护servers,indexservers []stringindex int
}//使用例子: NewMultiServerDiscovery([]string{"tcp@127.0.0.1:100","tcp@127.0.0.1:2100"})
func NewMultiServerDiscovery(servers []string) *MultiServerDiscovery {d := &MultiServerDiscovery{servers: servers,}d.index = rand.Intn(math.MaxInt32 - 1)return d
}
- index 记录 Round Robin 算法已经轮询到的位置,为了避免每次从 0 开始,初始化时随机设定一个值。
然后,实现 Discovery 接口
var _ Discovery = (*MultiServerDiscovery)(nil)func (d *MultiServerDiscovery) Refresh() error {return nil
}func (d *MultiServerDiscovery) Update(servers []string) error {d.rwMutex.Lock()defer d.rwMutex.Unlock()d.servers = serversreturn nil
}func (d *MultiServerDiscovery) Get(mode SelectMode) (string, error) {//这里不能用d.rwMutex.RLock(),因为d.index有更新d.rwMutex.Lock()defer d.rwMutex.Unlock()n := len(d.servers)if n == 0 {return "", errors.New("rpc discovery: no available servers")}switch mode {case RandomSelect:return d.servers[rand.Intn(n)], nilcase RoundRobinSelect:s := d.servers[d.index%n]d.index = (d.index + 1) % nreturn s, nildefault:return "", errors.New("rpc discovery: not supported select mode")}
}func (d *MultiServerDiscovery) GetAll() ([]string, error) {d.rwMutex.RLock()defer d.rwMutex.RUnlock()// return a copy of d.serversservers := make([]string, len(d.servers))copy(servers, d.servers)return servers, nil
}
3.为什么选择客户端负载均衡
RPC client 和 server 建立是长连接, 因而基于连接的负载均衡没有太大意义, 所以 该RPC 负载均衡是基于每次调用。也就是客户在同一个 client 发的请求希望它被负载均衡到所有服务端。
一般来说负载均衡器是独立的, 被放置在服务消费者和提供者之间. 代理通常需要保存请求响应副本, 因此有性能消耗也会造成额外延迟. 当请求量大时, lb (load balance)可能会变成瓶颈, 并且此时 lb 单点故障会影响整个服务。
客户端负载将lb 的功能集成到客户端进程里,然后使用负载均衡策略选择一个目标服务地址,向目标服务发起请求。LB能力被分散到每一个服务消费者的进程内部,同时服务消费方和服务提供方之间是直接调用,没有额外开销,性能比较好。
但用客户端负载均衡也有坏处, 若有多种不同的语言栈,就要配合开发多种不同的客户端,有一定的研发和维护成本。
4.支持负载均衡的客户端
之前对外使用的客户端是Dail(...),这里我们也要向用户暴露一个支持负载均衡的客户端,叫做 XClient。
//xclient.go
type XClient struct {d Discoverymode SelectModeopt *geerpc.Optionmutex sync.Mutexclients map[string]*geerpc.Client
}func NewXClient(d Discovery, mode SelectMode, opt *geerpc.Option) *XClient {return &XClient{d: d,mode: mode,opt: opt,clients: make(map[string]*geerpc.Client),}
}func (xc *XClient) Close() error {xc.mutex.Lock()defer xc.mutex.Unlock()for key, client := range xc.clients {//只是关闭,没有其他的对错误的处理client.Close()delete(xc.clients, key)}return nil
}
XClient 的构造函数需要传入三个参数,服务发现实例 Discovery、负载均衡模式 SelectMode 以及协议选项 Option。为了尽量地复用已经创建好的 Socket 连接,使用 clients 保存创建成功的 Client 实例,并提供 Close 方法在结束后,关闭已经建立的连接。
我们之前是使用dial函数来创建一个客户端,那我们为了可以复用已经创建好的socket连接,这里我们也实现一个dial函数,在内部复用socket连接。
func (xc *XClient) dial(rpcAddr string) (*geerpc.Client, error) {xc.mutex.Lock()defer xc.mutex.Unlock()client, ok := xc.clients[rpcAddr]if ok && !client.IsAvailable() {client.Close()delete(xc.clients, rpcAddr)client = nil}if client == nil {var err errorclient, err = geerpc.XDial(rpcAddr, xc.opt)if err != nil {return nil, err}xc.clients[rpcAddr] = client}return client, nil
}func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod string, args, reply interface{}) error {//获取sokcet连接(复用)client, err := xc.dial(rpcAddr)if err != nil {return err}return client.Call(ctx, serviceMethod, args, reply)
}// serviceMethod 例子:"Foo.SUM"
func (xc *XClient) Call(ctx context.Context, serviceMethod string, args, reply any) error {//通过负载均衡策略得到服务实例rpcAddr, err := xc.d.Get(xc.mode)if err != nil {return err}return xc.call(rpcAddr, ctx, serviceMethod, args, reply)
}
之后实现一个调用服务的方法Call。该方法主要是三步:
- 通过负载均衡策略得到服务实例
- 获取sokcet连接(复用)
- 最终调用client.Call去发送服务请求
另外,我们为 XClient 添加一个常用功能:Broadcast。
Broadcast
表示向所有服务器发送请求,只有所有服务器正确返回时才会成功。
Broadcast
是 XClient
的一个方法, 你可以将一个请求发送到这个服务的所有节点。
如果所有的节点都正常返回,没有错误的话, Broadcast
将返回其中的一个节点的返回结果。 如果有节点返回错误的话,将返回这些错误信息中的一个。
func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply any) error {//获取所有的服务实例servers, err := xc.d.GetAll()if err != nil {return err}var wg sync.WaitGroupvar mutex sync.Mutex //protect e and replyDonevar e errorreplyDone := reply == nilctx, cancel := context.WithCancel(ctx)defer cancel()for _, rpcAddr := range servers {wg.Add(1)//fmt.Printf("rpcAddrstring addr: %p\n", &rpcAddr) //其rpcAddr的地址都是一样的go func(rpcAddr string) {defer wg.Done()var clonedReply anyif reply != nil {//reply是指针的,所以需要使用Elem()clonedReply = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()}//xc.call方法中的参数clonedReply不能使用replyerr := xc.call(rpcAddr, ctx, serviceMethod, args, clonedReply)mutex.Lock()defer mutex.Unlock()if err != nil && e == nil {//e==nil表明e还没有被赋值e = errcancel() // if any call failed, cancel unfinished calls}if err == nil && !replyDone {reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem())replyDone = true}}(rpcAddr)}wg.Wait()return e
}// //另一种写法,go协程中没有参数
// func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply any) error {
// .............
// for _, rpcAddr := range servers {
// wg.Add(1)
// //fmt.Printf("rpcAddrstring addr: %p\n", &rpcAddr) //其rpcAddr的地址都是一样的
// addr:=rpcAddr
// go func() {
// defer wg.Done()
// err := xc.call(addr, ctx, serviceMethod, args, clonedReply)
// ......
// }()
// }
// ................
// }
需要注意的几点:
- 为了提升性能,请求是并发的。而要等待协程去访问服务实例结束,所以可以使用sync.WaitGroup来阻塞等待。
- 并发情况下,xc.call中不能使用reply入参,需要每个协程都有自己的clonedReply参数,不然就需要用锁来控制reply,这就不值得了。每个协程都有自己的clonedReply,获得结果后,再把clonedReply赋值给reply就行,这样就只需要使用互斥锁保证 error 和 reply 能被正确赋值即可。
- 借助 context.WithCancel 确保有错误发生时,快速失败。
5.测试
首先,启动 RPC 服务的代码还是类似的,Sum 是正常的方法,Sleep 用于验证 XClient 的超时机制能否正常运作。
type My inttype Args struct{ Num1, Num2 int }func (m *My) Sum(args Args, reply *int) error {*reply = args.Num1 + args.Num2return nil
}func (m *My) Sleep(args Args, reply *int) error {time.Sleep(time.Second * time.Duration(args.Num1))*reply = args.Num1 + args.Num2return nil
}
接着,有两个函数,clientCall调用单个服务实例,broadcast 调用所有服务实例。
这两个函数代码也是基本相似的,主要不同就是协程函数内的操作不同。
// 调用单个服务实例
func clientCall(addr1, addr2 string) {d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()var reply int = 1324if err := xc.Call(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {log.Println("call Foo.Sum error:", err)}fmt.Println("reply: ", reply)}(i)}wg.Wait()
}func broadcast(addr1, addr2 string) {d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer xc.Close()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()var reply int = 1324if err := xc.Broadcast(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {fmt.Println("Broadcast call Foo.Sum error:", err)}fmt.Println("Broadcast reply: ", reply)ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)defer cancel()var replyTimeout int = 1324if err := xc.Broadcast(ctx, "My.Sleep", &Args{Num1: i, Num2: i * i}, &replyTimeout); err != nil {fmt.Println("Broadcast call Foo.Sum error:", err)}fmt.Println("timeout Broadcast reply: ", replyTimeout)}(i)}wg.Wait()
}func main() {ch1 := make(chan string)ch2 := make(chan string)//start two serversgo startServer(ch1)go startServer(ch2)addr1 := <-ch1addr2 := <-ch2time.Sleep(time.Second)clientCall(addr1, addr2)broadcast(addr1, addr2)
}
效果
完整代码:https://github.com/liwook/Go-projects/tree/main/geerpc/6-load-balance