1、grpc自定义负载均衡(resolver)
1.1 proto编写和编译
syntax = "proto3";
package pb;
option go_package = "./;pb";service Greeter {rpc SayHello (HelloRequest) returns (HelloReply) {}
}message HelloRequest {string name = 1;
}message HelloReply {string message = 1;
}
$ protoc -I . --go_out=plugins=grpc:. ./helloword.proto
1.2 服务端
这里编写两个服务端:
package mainimport ("context""demo/pb""google.golang.org/grpc""log""net"
)const (port = ":50051"
)type server struct {pb.UnimplementedGreeterServer
}// 该函数定义必须与helloworld.pb.go定义的SayHello一致
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {//打印客户端传入HelloRequest请求的Name参数log.Printf("Received: %v", in.GetName())//将name参数作为返回值,返回给客户端return &pb.HelloReply{Message: "Service1: Hello " + in.GetName()}, nil
}// main方法函数开始执行的地方
func main() {// 调用标准库,监听50051端口的tcp连接lis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen: %v", err)}//创建grpc服务s := grpc.NewServer()//将server对象,也就是实现SayHello方法的对象,与grpc服务绑定pb.RegisterGreeterServer(s, &server{})// grpc服务开始接收访问50051端口的tcp连接数据if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}
package mainimport ("context""demo/pb""google.golang.org/grpc""log""net"
)const (port2 = ":50052"
)type server2 struct {pb.UnimplementedGreeterServer
}// 该函数定义必须与helloworld.pb.go 定义的SayHello一致
func (s *server2) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {//打印客户端传入HelloRequest请求的Name参数log.Printf("Received: %v", in.GetName())//将name参数作为返回值,返回给客户端return &pb.HelloReply{Message: "Service2: Hello " + in.GetName()}, nil
}// main方法 函数开始执行的地方
func main() {// 调用标准库,监听50052端口的tcp连接lis, err := net.Listen("tcp", port2)if err != nil {log.Fatalf("failed to listen: %v", err)}//创建grpc服务s := grpc.NewServer()//将server对象,也就是实现SayHello方法的对象,与grpc服务绑定pb.RegisterGreeterServer(s, &server2{})// grpc服务开始接收访问50051端口的tcp连接数据if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}
1.3 客户端
package mainimport ("context""demo/pb""fmt""google.golang.org/grpc""google.golang.org/grpc/balancer""google.golang.org/grpc/connectivity""google.golang.org/grpc/resolver""log""time"
)// 全局注册Scheme为myservice的Resolver Build
func init() {log.Println("&myServiceBuilder!")resolver.Register(&myServiceBuilder{})
}type myServiceBuilder struct {
}func (*myServiceBuilder) Scheme() string {log.Println("myServiceBuilder Scheme()!")return "myservice"
}// 创建Resolver实例
func (*myServiceBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {log.Println("myServiceBuilder Build()!")r := &myServiceResolver{target: target,cc: cc,}r.start()return r, nil
}type myServiceResolver struct {target resolver.Targetcc resolver.ClientConn
}// 根据target不同,解析出不同的端口
func (r *myServiceResolver) start() {log.Println("myServiceResolver start()!")// 模拟myservice解析出两个Addressr.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ":50051"}, {Addr: ":50052"}}})
}// 再次解析使用的解析方式不变
func (r *myServiceResolver) ResolveNow(o resolver.ResolveNowOptions) {log.Println("myServiceResolver ResolveNow()!")r.start()
}func (*myServiceResolver) Close() {log.Println("myServiceResolver Close()!")
}const (address1 = "myservice:///abc"
)// 自定义负载均衡
// 初始化中进行注册
func init() {balancer.Register(newMyPickBuilder())
}func newMyPickBuilder() balancer.Builder {log.Println("newMyPickBuilder()!")return &myPickBuilder{}
}type myPickBuilder struct{}func (*myPickBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {log.Println("myPickBuilder Build()!")return &myPickBalancer{state: 0,cc: cc,subConns: make(map[resolver.Address]balancer.SubConn),subConns1: make(map[balancer.SubConn]resolver.Address),}
}func (*myPickBuilder) Name() string {log.Println("myPickBuilder Name()!")return "mypickBalance"
}type myPickBalancer struct {state connectivity.Statecc balancer.ClientConnsubConns map[resolver.Address]balancer.SubConnsubConns1 map[balancer.SubConn]resolver.Address
}func (b *myPickBalancer) ResolverError(err error) {log.Println("myPickBalancer ResolverError()!")// TODO 需要剔除无效连接
}func (b *myPickBalancer) UpdateClientConnState(s balancer.ClientConnState) error {log.Println("myPickBalancer UpdateClientConnState()!")addrsSet := make(map[resolver.Address]struct{})for _, a := range s.ResolverState.Addresses {addrsSet[a] = struct{}{}if _, ok := b.subConns[a]; !ok {sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})if err != nil {continue}b.subConns[a] = scsc.Connect()}}return nil
}func (b *myPickBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {log.Println("myPickBalancer UpdateSubConnState()!")// TODO 需要剔除无效连接,增加有效连接//if s.ConnectivityState == connectivity.Ready {// b.subConns[b.subConns1[sc]] = sc//}log.Println("b.subConns", b.subConns)var scs []balancer.SubConnfor _, sc := range b.subConns {scs = append(scs, sc)}if len(b.subConns) == 2 {b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &myPicker{scs}})}
}func (b *myPickBalancer) Close() {log.Println("myPickBalancer Close()!")
}type myPicker struct {subConns []balancer.SubConn
}func (p *myPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {log.Println("myPicker Pick()!")//获取当前时间second := time.Now().Second()fmt.Printf("Current Time Second:%d\n", second)if second%2 == 0 {return balancer.PickResult{SubConn: p.subConns[0]}, nil}return balancer.PickResult{SubConn: p.subConns[1]}, nil
}func main() {// 访问服务端address,创建连接conn,地址格式"myservice:///abc"conn, err := grpc.Dial(address1, grpc.WithInsecure(), grpc.WithBlock(),grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"mypickBalance":{}}]}`))if err != nil {log.Fatalf("did not connect: %v", err)}time.Sleep(100 * time.Millisecond)defer conn.Close()c := pb.NewGreeterClient(conn)// 设置客户端访问超时时间1秒ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 客户端调用服务端 SayHello 请求,传入Name 为 "world", 返回值为服务端返回参数r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "world"})if err != nil {log.Fatalf("could not greet: %v", err)}// 根据服务端处理逻辑,返回值也为"world"log.Printf("Greeting: %s", r.GetMessage())time.Sleep(time.Second)ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second)defer cancel2()// 客户端调用服务端 SayHello 请求,传入Name 为 "world", 返回值为服务端返回参数r2, err2 := c.SayHello(ctx2, &pb.HelloRequest{Name: "world"})if err2 != nil {log.Fatalf("could not greet: %v", err2)}// 根据服务端处理逻辑,返回值也为"world"log.Printf("Greeting: %s", r2.GetMessage())
}
1.4 测试
[root@zsx demo]# go run server/server1.go
2023/02/18 09:41:22 Received: world
[root@zsx demo]# go run server/server2.go
2023/02/18 09:41:21 Received: world
[root@zsx demo]# go run client/client.go
2023/02/18 09:41:21 &myServiceBuilder!
2023/02/18 09:41:21 myServiceBuilder Scheme()!
2023/02/18 09:41:21 newMyPickBuilder()!
2023/02/18 09:41:21 myPickBuilder Name()!
2023/02/18 09:41:21 myServiceBuilder Build()!
2023/02/18 09:41:21 myServiceResolver start()!
2023/02/18 09:41:21 myPickBuilder Build()!
2023/02/18 09:41:21 myPickBuilder Name()!
2023/02/18 09:41:21 myPickBalancer UpdateClientConnState()!
2023/02/18 09:41:21 myPickBalancer UpdateSubConnState()!
2023/02/18 09:41:21 myPickBalancer UpdateSubConnState()!
2023/02/18 09:41:21 myPickBalancer UpdateSubConnState()!
2023/02/18 09:41:21 myPickBalancer UpdateSubConnState()!
2023/02/18 09:41:21 myPicker Pick()!
Current Time Second:21
2023/02/18 09:41:21 Greeting: Service2: Hello world
2023/02/18 09:41:22 myPicker Pick()!
Current Time Second:22
2023/02/18 09:41:22 Greeting: Service1: Hello world
2023/02/18 09:41:22 myPickBalancer Close()!
2023/02/18 09:41:22 myServiceResolver Close()!
# 项目结构
$ tree demo
demo
├── client
│ └── client.go
├── go.mod
├── go.sum
├── pb
│ ├── helloword.pb.go
│ └── helloword.proto
└── server├── server1.go└── server2.go