1.服务发现
1.1 介绍
- 服务发现是指用注册中心来记录服务信息,以便其他服务快速查找已注册服务
- 服务发现分类:
- 客户端服务发现
- 服务端服务发现
1.2 客户端服务发现
客户端服务发现(Client-side Service Discovery)是一种微服务架构中的模式,用于让客户端应用动态地发现并调用其他服务的实例,而无需通过一个中介(例如负载均衡器或服务网关)。它通常用于分布式系统中,通过客户端直接决定并选择与哪个服务实例通信,从而实现服务发现和负载均衡。
1.3 服务端服务发现
服务端服务发现(Server-side Service Discovery)是另一种服务发现模式,与客户端服务发现相对。在这种模式中,服务的实例发现和负载均衡由服务端组件处理,客户端只需将请求发送给一个固定的入口点(如负载均衡器或 API 网关),由这个入口点负责将请求路由到合适的服务实例。
2.zookeeper
2.1 zookeeper介绍
Apache ZooKeeper 是一个用于分布式系统中的协调服务。它提供了一套高效、可靠的分布式协调工具,用于实现服务注册、配置管理、同步、领导者选举等功能。Zookeeper 的设计初衷是简化分布式应用中的协调任务,从而使应用开发更容易。
- 是一个分布式数据库(程序协调服务),Hadoop子项目
- 树状方式维护节点方数据的增、删、改、查
- 监听通知机制:通过监听可以获取相应消息事件(内容,子节点)
2.2 zookeeper安装
安装zookeeper
- 参考官方文档安装
- http://zookeeper.apache.org/doc/r3.6.0/zookeeperStarted.html
- 下载时需要注意下载的是编译过的二进制文件,不是源码
- 不然会爆错:找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain
- 解压缩
- 编辑 conf/zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
- 运行
bin/zkServer.sh start
2.3 zookeeper核心功能
- 持久节点
- 一直存在服务器上
- 临时节点
- 会话失效,节点自动清理
- 顺序节点
- 节点创建,自动分配序列号
2.3.1 增删改查API
package mainimport ("fmt""github.com/samuel/go-zookeeper/zk""time"
)var (host = []string{"127.0.0.1:2181"}
)func main() {//连接客户端conn, _, err := zk.Connect(host, 5*time.Second)if err != nil {panic(err)}//增if _, err := conn.Create("/test_tree2", []byte("tree_content"),0, zk.WorldACL(zk.PermAll)); err != nil {fmt.Println("create err", err)}//查nodeValue, dStat, err := conn.Get("/test_tree2")if err != nil {fmt.Println("get err", err)return}fmt.Println("nodeValue", string(nodeValue))//改,需要先查询得到版本号if _, err := conn.Set("/test_tree2", []byte("new_content"),dStat.Version); err != nil {fmt.Println("update err", err)}//删除,也,需要先查询得到版本号_, dStat, _ = conn.Get("/test_tree2")if err := conn.Delete("/test_tree2", dStat.Version); err != nil {fmt.Println("Delete err", err)//return}//验证存在hasNode, _, err := conn.Exists("/test_tree2")if err != nil {fmt.Println("Exists err", err)//return}fmt.Println("node Exist", hasNode)//增加if _, err := conn.Create("/test_tree2", []byte("tree_content"),0, zk.WorldACL(zk.PermAll)); err != nil {fmt.Println("create err", err)}//设置子节点,如果上游节点不存在则会报错if _, err := conn.Create("/test_tree2/subnode", []byte("node_content"),0, zk.WorldACL(zk.PermAll)); err != nil {fmt.Println("create err", err)}//获取子节点列表childNodes, _, err := conn.Children("/test_tree2")if err != nil {fmt.Println("Children err", err)}fmt.Println("childNodes", childNodes)
}
2.3.2 监听子节点变化
package mainimport ("fmt""github.com/e421083458/gateway_demo/proxy/zookeeper""log""os""os/signal""syscall"
)var addr = "127.0.0.1:2002"func main() {//获取zk节点列表zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"})zkManager.GetConnect()defer zkManager.Close()// 注册一个节点err := zkManager.RegistServerPath("/real_server", "127.0.0.1")err = zkManager.RegistServerPath("/real_server/test", "127.0.0.1:8823")err = zkManager.RegistServerPath("/real_server/test2", "127.0.0.1:8823")if err != nil {return}// 获取节点列表zlist, err := zkManager.GetServerListByPath("/real_server")fmt.Println("server node:")fmt.Println(zlist)if err != nil {log.Println(err)}//动态监听节点变化chanList, chanErr := zkManager.WatchServerListByPath("/real_server")go func() {for {select {case changeErr := <-chanErr:fmt.Println("changeErr")fmt.Println(changeErr)case changedList := <-chanList:fmt.Println("watch node changed")fmt.Println(changedList)}}}()time.Sleep(time.Second * 5)zkManager.RegistServerPath("/real_server/test3", "127.0.0.2:8888")//关闭信号监听quit := make(chan os.Signal)signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)<-quit
}
2.3.3 监听节点内容变化
package mainimport ("fmt""github.com/e421083458/gateway_demo/proxy/zookeeper""log""os""os/signal""syscall""time"
)var addr = "127.0.0.1:2002"
func main() {//获取zk节点列表zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"})zkManager.GetConnect()defer zkManager.Close()// 注册一个节点err := zkManager.RegistServerPath("/rs_server_conf", "192.168.1.101")if err != nil {fmt.Printf("2001:%v \n", err)return}// 获取节点列表zlist, err := zkManager.GetServerListByPath("/rs_server_conf")fmt.Println("server node:")fmt.Println(zlist)if err != nil {log.Println(err)}获取节点内容zc, _, err := zkManager.GetPathData("/rs_server_conf")if err != nil {log.Println(err)}fmt.Println("get node data:")fmt.Println(string(zc))//动态监听节点内容dataChan, dataErrChan := zkManager.WatchPathData("/rs_server_conf")go func() {for {select {case changeErr := <-dataErrChan:fmt.Println("changeErr")fmt.Println(changeErr)case changedData := <-dataChan:fmt.Println("WatchGetData changed")fmt.Println(string(changedData))}}}()// 尝试修改内容time.Sleep(5 * time.Second)_, z, err := zkManager.GetPathData("/rs_server_conf")if err != nil {return}err = zkManager.SetPathData("/rs_server_conf", []byte(addr), z.Version)if err != nil {fmt.Sprintf("2002:%v \n", err)return}//关闭信号监听quit := make(chan os.Signal)signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)<-quit
}
3.网关实现服务发现原理
3.1 网关实现客户端服务发现
3.2 网关实现服务端服务发现
- 将服务注册到zookeeper中
- 网关通过监听zookeeper中的事件来感知变化
4.网关拓展服务发现
- 下游机器启动时创建临时节点:节点名与内容为服务地址
- 以观察者模式构建负载均衡配置LoadBalanceConf
- 负载均衡配置LoadBalanceConf与负载均衡器整合
4.1 观察者模式
观察者模式(Observer Pattern)是一种行为设计模式,它定义了一种一对多的依赖关系,使得多个观察者对象可以同时监听某一个主题对象。当这个主题对象的状态发生变化时,会通知所有观察者对象,使它们能够自动更新。观察者模式常用于实现事件处理系统,如用户界面事件、订阅/发布系统等。
- 关键概念
- 主题(Subject):也称为发布者(Publisher),它维护一组观察者对象,并提供注册和移除观察者的方法。当主题的状态发生变化时,会通知所有观察者。
- 观察者(Observer):也称为订阅者(Subscriber),它定义了一个更新接口,用于接收来自主题的通知。每个观察者在接收到通知后,可以执行特定的操作。
- 通知(Notification):指的是主题状态变化时向观察者发送的信号或消息。
4.2 以观察者模式构建负载均衡配置
package load_balanceimport ("fmt""github.com/e421083458/gateway_demo/proxy/zookeeper"
)// 配置主题
type LoadBalanceConf interface {Attach(o Observer)GetConf() []stringWatchConf()UpdateConf(conf []string)
}type LoadBalanceZkConf struct {observers []Observerpath stringzkHosts []stringconfIpWeight map[string]stringactiveList []stringformat string
}func (s *LoadBalanceZkConf) Attach(o Observer) {s.observers = append(s.observers, o)
}func (s *LoadBalanceZkConf) NotifyAllObservers() {for _, obs := range s.observers {obs.Update()}
}func (s *LoadBalanceZkConf) GetConf() []string {confList := []string{}for _, ip := range s.activeList {weight, ok := s.confIpWeight[ip]if !ok {weight = "50" //默认weight}confList = append(confList, fmt.Sprintf(s.format, ip)+","+weight)}return confList
}//更新配置时,通知监听者也更新
func (s *LoadBalanceZkConf) WatchConf() {zkManager := zookeeper.NewZkManager(s.zkHosts)zkManager.GetConnect()fmt.Println("watchConf")chanList, chanErr := zkManager.WatchServerListByPath(s.path)go func() {defer zkManager.Close()for {select {case changeErr := <-chanErr:fmt.Println("changeErr", changeErr)case changedList := <-chanList:fmt.Println("watch node changed")s.UpdateConf(changedList)}}}()
}//更新配置时,通知监听者也更新
func (s *LoadBalanceZkConf) UpdateConf(conf []string) {s.activeList = conffor _, obs := range s.observers {obs.Update()}
}func NewLoadBalanceZkConf(format, path string, zkHosts []string, conf map[string]string) (*LoadBalanceZkConf, error) {zkManager := zookeeper.NewZkManager(zkHosts)zkManager.GetConnect()defer zkManager.Close()zlist, err := zkManager.GetServerListByPath(path)if err != nil {return nil, err}mConf := &LoadBalanceZkConf{format: format, activeList: zlist, confIpWeight: conf, zkHosts: zkHosts, path: path}mConf.WatchConf()return mConf, nil
}type Observer interface {Update()
}type LoadBalanceObserver struct {ModuleConf *LoadBalanceZkConf
}func (l *LoadBalanceObserver) Update() {fmt.Println("Update get conf:", l.ModuleConf.GetConf())
}func NewLoadBalanceObserver(conf *LoadBalanceZkConf) *LoadBalanceObserver {return &LoadBalanceObserver{ModuleConf: conf,}
}
4.3 负载均衡配置LoadBalanceConf与负载均衡器整合
package mainimport ("github.com/e421083458/gateway_demo/proxy/load_balance""github.com/e421083458/gateway_demo/proxy/middleware"proxy2 "github.com/e421083458/gateway_demo/proxy/proxy""log""net/http"
)var (addr = "127.0.0.1:2002"
)func main() {mConf, err := load_balance.NewLoadBalanceZkConf("http://%s/base","/real_server",[]string{"127.0.0.1:2181"},map[string]string{"127.0.0.1:2003": "20"})if err != nil {panic(err)}rb := load_balance.LoadBanlanceFactorWithConf(load_balance.LbWeightRoundRobin, mConf)proxy := proxy2.NewLoadBalanceReverseProxy(&middleware.SliceRouterContext{}, rb)log.Println("Starting httpserver at " + addr)log.Fatal(http.ListenAndServe(addr, proxy))
}
4.4 客户端服务发现实现
网关主动通过心跳检测区检测客户端的服务
- 下游机器启动时无需进行任何操作
- 以观察者模式构建负载均衡配置LoadBalanceConf
- 负载均衡配置固定时间频率监测下游节点健康状况
package load_balanceimport ("fmt""net""reflect""sort""time"
)const (//default check settingDefaultCheckMethod = 0DefaultCheckTimeout = 2DefaultCheckMaxErrNum = 2DefaultCheckInterval = 5
)type LoadBalanceCheckConf struct {observers []ObserverconfIpWeight map[string]stringactiveList []stringformat string
}func (s *LoadBalanceCheckConf) Attach(o Observer) {s.observers = append(s.observers, o)
}func (s *LoadBalanceCheckConf) NotifyAllObservers() {for _, obs := range s.observers {obs.Update()}
}func (s *LoadBalanceCheckConf) GetConf() []string {confList := []string{}for _, ip := range s.activeList {weight, ok := s.confIpWeight[ip]if !ok {weight = "50" //默认weight}confList = append(confList, fmt.Sprintf(s.format, ip)+","+weight)}return confList
}//更新配置时,通知监听者也更新
func (s *LoadBalanceCheckConf) WatchConf() {fmt.Println("watchConf")go func() {confIpErrNum := map[string]int{}for {changedList := []string{}for item, _ := range s.confIpWeight {conn, err := net.DialTimeout("tcp", item, time.Duration(DefaultCheckTimeout)*time.Second)//todo http statuscodeif err == nil {conn.Close()if _, ok := confIpErrNum[item]; ok {confIpErrNum[item] = 0}}if err != nil {if _, ok := confIpErrNum[item]; ok {confIpErrNum[item] += 1} else {confIpErrNum[item] = 1}}if confIpErrNum[item] < DefaultCheckMaxErrNum {changedList = append(changedList, item)}}sort.Strings(changedList)sort.Strings(s.activeList)if !reflect.DeepEqual(changedList, s.activeList) {s.UpdateConf(changedList)}time.Sleep(time.Duration(DefaultCheckInterval) * time.Second)}}()
}//更新配置时,通知监听者也更新
func (s *LoadBalanceCheckConf) UpdateConf(conf []string) {fmt.Println("UpdateConf", conf)s.activeList = conffor _, obs := range s.observers {obs.Update()}
}func NewLoadBalanceCheckConf(format string, conf map[string]string) (*LoadBalanceCheckConf, error) {aList := []string{}//默认初始化for item, _ := range conf {aList = append(aList, item)}mConf := &LoadBalanceCheckConf{format: format, activeList: aList, confIpWeight: conf}mConf.WatchConf()return mConf, nil
}