中心网关:gateway
四个微服务:user、message、note、relationship
1 中心网关实现服务发现
1.1 设计EtcdDiscovery类
package entityimport ("context""fmt"clientv3 "go.etcd.io/etcd/client/v3""gonote/gateway/internal/option"messageService "gonote/message/service"noteService "gonote/note/service"relationshipService "gonote/relationship/service"userService "gonote/user/service""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure""sync""time"
)type EtcdDiscovery struct {etcdClient *clientv3.ClientserviceMap map[string]interface{}mu sync.RWMutex
}func NewEtcdDiscovery(ip string, port int) (*EtcdDiscovery, error) {etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{fmt.Sprintf("%v:%v", ip, port)},DialTimeout: time.Second * 3,})if err != nil {return nil, err}return &EtcdDiscovery{etcdClient: etcdCli,serviceMap: make(map[string]interface{}),}, nil
}func (ed *EtcdDiscovery) Start(serviceNames []string) {for _, serviceName := range serviceNames {resp, err := ed.etcdClient.Get(context.TODO(), serviceName)if err != nil {panic(err)}addr := string(resp.Kvs[0].Value)conn, err := grpc.Dial(addr,grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {panic(err)}// etcd存储的是各个微服务的监听地址,通过监听地址创建服务实例switch serviceName {case option.User:grpcCli := userService.NewUserServiceClient(conn)ed.serviceMap[serviceName] = grpcClicase option.Relationship:grpcCli := relationshipService.NewRelationshipServiceClient(conn)ed.serviceMap[serviceName] = grpcClicase option.Note:grpcCli := noteService.NewNoteServiceClient(conn)ed.serviceMap[serviceName] = grpcClicase option.Message:grpcCli := messageService.NewMessageServiceClient(conn)ed.serviceMap[serviceName] = grpcCli}// 开启协程,监听etcd的变化,动态维护各个grpc服务实例go ed.watch(serviceName)}
}func (ed *EtcdDiscovery) watch(serviceName string) {watchChan := ed.etcdClient.Watch(context.TODO(), serviceName)for event := range watchChan {for _, e := range event.Events {if e.Type == clientv3.EventTypePut {addr := string(e.Kv.Value)conn, err := grpc.Dial(addr,grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {continue}ed.mu.Lock()switch serviceName {case option.User:grpcCli := userService.NewUserServiceClient(conn)ed.serviceMap[serviceName] = grpcClicase option.Relationship:grpcCli := relationshipService.NewRelationshipServiceClient(conn)ed.serviceMap[serviceName] = grpcClicase option.Note:grpcCli := noteService.NewNoteServiceClient(conn)ed.serviceMap[serviceName] = grpcClicase option.Message:grpcCli := messageService.NewMessageServiceClient(conn)ed.serviceMap[serviceName] = grpcCli}ed.mu.Unlock()} else if e.Type == clientv3.EventTypeDelete {ed.mu.Lock()delete(ed.serviceMap, serviceName)ed.mu.Unlock()}}}
}func (ed *EtcdDiscovery) GetServiceAddr(serviceName string) interface{} {ed.mu.RLock()defer ed.mu.RUnlock()return ed.serviceMap[serviceName]
}
1.2 在web启动时,初始化EtcdDiscovery
package mainimport ("gonote/gateway/internal""gonote/gateway/internal/option""gonote/gateway/internal/util"
)func init() {util.InitLogger()util.InitWebSocketServer(64)err := util.InitRedis()if err != nil {panic(err)}util.InitKafka(option.Topic)util.InitEtcdDiscovery([]string{option.User,option.Relationship,option.Note,option.Message})
}func main() {engine := internal.Router()if err := engine.Run("0.0.0.0:9090"); err != nil {panic(err)}
}
1.3 调用EtcdDiscovery获取服务实例
举个用户注册的例子:
func UserLogin(c *gin.Context) {em := c.PostForm("email")pwd := c.PostForm("pwd")// 获取服务实例cli := util.EtcdDiscovery.GetServiceAddr(option.User).(service.UserServiceClient)// 调用服务_, err := cli.UserLogin(context.TODO(), &service.User{Email: em,Pwd: pwd,})if err != nil {c.JSON(200, gin.H{"code": 1,"msg": err.Error(),})return}// 生成jwt令牌token, err := util.GenToken(em)if err != nil {c.JSON(200, gin.H{"code": 1,"msg": err.Error(),})return}// session维护长连接session := sessions.Default(c)session.Set("email", em)err = session.Save()if err != nil {c.JSON(200, gin.H{"code": 1,"msg": err.Error(),})return}c.JSON(200, gin.H{"code": 0,"data": token,})
}
2 微服务端进行服务注册
user业务对应的微服务:
func init() {util.InitLogger()err := util.InitDB()if err != nil {panic(err)}util.InitKafka(option.Topic)util.InitEtcdCli()
}func main() {addr := option.IP + ":" + option.Portln, err := net.Listen("tcp", addr)if err != nil {panic(err)}defer ln.Close()defer util.EtcdCli.Close()defer util.KafkaCli.Close()// 服务注册_, err = util.EtcdCli.Put(context.TODO(), "user", addr)if err != nil {panic(err)}server := grpc.NewServer()service.RegisterUserServiceServer(server, &service.UserServiceImpl{})if err = server.Serve(ln); err != nil {panic(err)}
}
通过etcd命令查看相关注册信息