博客参考:K8s组件:etcd安装、使用及原理(Linux)
分布式系统架构中对一致性要求很高,etcd就满足了分布式系统中的一致性要求,实现了分布式一致性键值对存储的中间件。etcd完整的集群至少需要3台,这样才能选出一个master和两个node,具有强一致性,常用于注册中心(配置共享和服务发现)。现目前是分布式和云原生下的基础组件,如:k8s。内部使用的raft算法,算法具体内容见我的另外一篇博客,在此描述几个关键点:Leader、Candidate、Follower、任期、心跳探测。脑裂:当网络恢复之后,旧的Leader发现集群中的新Leader的Term比自己大,则自动降级为Follower,并从新Leader处同步数据达成集群数据一致。数据同步流程:当Client发起数据更新请求,请求会先到领袖节点C,节点C会更新日志数据,然后通知群众节点也更新日志,当群众节点更新日志成功后,会返回成功通知给领袖C,至此完成了“提交”操作;当领袖C收到通知后,会更新本地数据,并通知群众也更新本地数据,同时会返回成功通知给Client,至此完成了“应用”操作,如果后续Client又有新的数据更新操作,会重复上述流程。结构定义:从raftLog结构体可以看到,里面有两个存储位置,一个是storage,是保存已经持久化过的日志条目,unstable是保存的尚未持久化的日志条目。storage结构中字段的定义,它实际上就是包含一个WAL来保存日志条目,一个Snapshotter负责保存日志快照的。安装:略简易操作:
etcdctl put myKey "this is etcd"
etcdctl get myKey
etcdctl del myKeyetcd实现服务注册与发现:
1) docker pull bitnami/etcd
2) docker run -d --name Etcd-server --publish 2379:2379 --env ALLOW_NONE_AUTHENTICATION=yes --env ETCD_ADVERTISE_CLIENT_URLS=http://localhost:2379 bitnami/etcd:latest3)编写proto文件:syntax = "proto3";
option go_package = ".;rpc";
message Empty {
}
message HelloResponse {string hello = 1;
}
message RegisterRequest {string name = 1;string password = 2;
}
message RegisterResponse {string uid = 1;
}
service Server {rpc Hello(Empty) returns(HelloResponse);rpc Register(RegisterRequest) returns(RegisterResponse);
}4)通过这个文件和一个.sh脚本生成server和client代码。生成服务端的etcd.go:package mainimport ("context""fmt"clientv3 "go.etcd.io/etcd/client/v3""go.etcd.io/etcd/client/v3/naming/endpoints""log"
)const etcdUrl = "http://localhost:2379"
const serviceName = "chihuo/server"
const ttl = 10
var etcdClient *clientv3.Clientfunc etcdRegister(addr string) error {log.Printf("etcdRegister %s\b", addr)etcdClient, err := clientv3.NewFromURL(etcdUrl)if err != nil {return err}em, err := endpoints.NewManager(etcdClient, serviceName)if err != nil {return err}lease, _ := etcdClient.Grant(context.TODO(), ttl)err = em.AddEndpoint(context.TODO(), fmt.Sprintf("%s/%s", serviceName, addr), endpoints.Endpoint{Addr: addr}, clientv3.WithLease(lease.ID))if err != nil {return err}//etcdClient.KeepAlive(context.TODO(), lease.ID)alive, err := etcdClient.KeepAlive(context.TODO(), lease.ID)if err != nil {return err}go func() {for {<-alivefmt.Println("etcd server keep alive")}}()return nil
}func etcdUnRegister(addr string) error {log.Printf("etcdUnRegister %s\b", addr)if etcdClient != nil {em, err := endpoints.NewManager(etcdClient, serviceName)if err != nil {return err}err = em.DeleteEndpoint(context.TODO(), fmt.Sprintf("%s/%s", serviceName, addr))if err != nil {return err}return err}return nil
}生成的服务端的server.go:
package mainimport ("context""fmt""go_code/demo01/study/etcd-grpc/server/rpc"
)type Server struct {
}// server.proto文件中 服务提供的方法
// rpc Hello(Empty) returns(HelloResponse);
func (s Server) Hello(ctx context.Context, request *rpc.Empty) (*rpc.HelloResponse, error) {//server.proto定义的HelloResponse中只有一个string参数resp := rpc.HelloResponse{Hello: "hello client."}return &resp, nil
}/*
server.proto文件中定义的格式,因此设置resp.uidmessage RegisterResponse {string uid = 1;}
*/
func (s Server) Register(ctx context.Context, request *rpc.RegisterRequest) (*rpc.RegisterResponse, error) {resp := rpc.RegisterResponse{}resp.Uid = fmt.Sprintf("%s.%s", request.GetName(), request.GetPassword())return &resp, nil
}生成的服务端的main.go:
package mainimport ("context""flag""fmt""go_code/demo01/study/etcd-grpc/server/rpc""google.golang.org/grpc""log""net""os""os/signal""syscall"
)func main() {var port intflag.IntVar(&port, "port", 8001, "port")flag.Parse()addr := fmt.Sprintf("localhost:%d", port)//关闭信号处理ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)go func() {//开启协程从ch管道中读取,如果有服务停止,则注销etcd中的服务s := <-ch//处理etcd中服务的注销流程etcdUnRegister(addr)if i, ok := s.(syscall.Signal); ok {os.Exit(int(i))} else {os.Exit(0)}}()//注册服务err := etcdRegister(addr)if err != nil {panic(err)}lis, err := net.Listen("tcp", addr)if err != nil {panic(err)}grpcServer := grpc.NewServer(grpc.UnaryInterceptor(UnaryInterceptor()))rpc.RegisterServerServer(grpcServer, Server{})log.Printf("service start port %d\n", port)if err := grpcServer.Serve(lis); err != nil {panic(err)}
}func UnaryInterceptor() grpc.UnaryServerInterceptor {return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {log.Printf("call %s\n", info.FullMethod)resp, err = handler(ctx, req)return resp, err}
}生成的客户端的代码client.go:
package mainimport ("context""fmt"clientv3 "go.etcd.io/etcd/client/v3""go.etcd.io/etcd/client/v3/naming/resolver"rpc2 "go_code/demo01/study/etcd-grpc/client/rpc""google.golang.org/grpc/balancer/roundrobin""google.golang.org/grpc/credentials/insecure""log""time""google.golang.org/grpc"
)const etcdUrl = "http://localhost:2379"
const serviceName = "chihuo/server"func main() {//bd := &ChihuoBuilder{addrs: map[string][]string{"/api": []string{"localhost:8001", "localhost:8002", "localhost:8003"}}}//resolver.Register(bd)//获取etcd客户端etcdClient, err := clientv3.NewFromURL(etcdUrl)if err != nil {panic(err)}etcdResolver, err := resolver.NewBuilder(etcdClient)//通过grpc与服务建立连接conn, err := grpc.Dial(fmt.Sprintf("etcd:///%s", serviceName), grpc.WithResolvers(etcdResolver), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)))if err != nil {fmt.Printf("err: %v", err)return}//rpc2 "go_code/demo01/study/etcd-grpc/client/rpc"//通过连接conn获取ServerClient(与服务器相互通信)ServerClient := rpc2.NewServerClient(conn)for {//通过客户端发起远程调用【请求服务端的Hello方法】,接受服务器的返回结果helloRespone, err := ServerClient.Hello(context.Background(), &rpc2.Empty{})if err != nil {fmt.Printf("err: %v", err)return}log.Println(helloRespone, err)time.Sleep(500 * time.Millisecond)}}5)启动三个server,并向etcd注册服务// 进入server/main.go所在目录
go run . --port 8081
go run . --port 8082
go run . --port 80836)启动一个client端,通过etcd拉取服务
go run .\main.go观察三个server的打印,可以发现,client端的请求时负载均衡的,每个server都有可能被访问到7)我们停止server3,发现client的请求被均衡的分发到server1、server28)进入docker部署的etcd内部,查询所有的key:发现只有8081和8082的