文章目录
- 微服务框架与web框架的区别点在哪儿?
- 为什么还要有微服务框架
- 微服务框架与web框架的对比
- 小结
- 为什么选go-zero?
- 框架对比
- 下载并认识go-zero
- 认识go-zero
- 环境要求
- 组成
- 下载
- 实践go-zero基础功能案例+api+rpc服务
- 功能说明
- 准备
- 构建rpc服务
- 构建api服务
- 服务之间的调度
- 实践go-zero基础功能案例-数据库读写与中间件
- 数据库
- 中间件
- 探究go-zero是如何基于grpc进一步扩展
- 为什么还要在grpc上扩展?
- go-zero的目录结构
- go-zero如何适配grpc
- 服务初始化
- 服务启动
- 小结
- 初步分析
- 初始化client
- 负载均衡机制是如何初始化的
- 小结
- 关于go-zero中的中间件功能是如何实现的
- 目录结构
- 初始化
- 启动流程
- 总结
微服务框架与web框架的区别点在哪儿?
为什么还要有微服务框架
主要因素是微服务架构所带来的问题,当一个项目拆分为多个服务后基于分布式集群部署,会使得整个系统的复杂度增大,这时就会出现单体情况下所没有的新的问题。如
- 如何动态的增加减少服务
- 如何定位项目中出现的问题
- 某服务出现问题怎么办等问题
在之前讲过微服务的核心要素,我们可以通过web框架结合rpc实现微服务,满足微服务项目的开发,对于前面提到的问题我们可以通过目前业界主流的解决方案去解决也是可以的。而微服务框架是在web框架的基础上融入了主流微服务架构问题的解决方案,这样自己在做微服务架构项目的时候就可以专注在业务的开发上。
微服务框架与web框架的对比
框架功能
目前的微服务框架基本包含web框架的功能,因此我们也可以通过微服务框架实现web应用开发,而微服务框架在web框架的基础上还有如:服务注册发现、rpcServer与rpcClient、链路跟踪等功能。
目的和用途
web框架主要用于构建web应用,它提供处理http请求、模板引擎等功能,是可以让开发人员快速搭建和管理web的项目。而微服务框架更加关注在分布式系统和服务化构建,使每个服务都可以独立开发、部署和扩展,从而提供整个系统的灵活性和可伸缩性。
架构模式
web框架通常基于传统客户端-服务端的架构,
微服务框架则是基于微服务架构
架构模式:Web框架通常基于传统的客户端-服务器架构,其中客户端发出HTTP请求,服务器接收请求并返回相应的HTML、JSON等数据。而微服务框架则是基于微服务架构,它将应用程序划分为一组独立的服务,每个服务都有自己的数据库、业务逻辑和API接口,它们通过网络进行通信,并可以独立部署和扩展。
小结
总的来说,Web框架更适合构建相对简单的Web应用程序,而微服务框架适合构建复杂、大规模和分布式的系统。选择使用哪种框架取决于项目的需求和规模。
为什么选go-zero?
框架对比
-
go-micro是go语言早期的微服务框架,在2015年推行,它提供了服务发现、负载均衡、消息传递、分布式配置等功能,并支持多种传输协议和消息队列,有较多丰富的工具集。
但在大版本之间的兼容性不佳、其次随着时代发展,go-micro的创始人专注做云原生,社区维护较弱,相关文档更新不足,对初学者来说上手难度会有一些。
在github上有21.5k:https://github.com/go-micro/go-micro -
tarsgo: 是由腾讯开源的项目、属于tars系列整体偏重,在使用上需要先安装tars并选用tars这个框架的体系,不足之处在于独立性相对较差。
社区建设:是相对完善有qq群、微信群等
在github上有3.3k: https://github.com/TarsCloud/TarsGo -
dubbo go 阿里开源项目特点与tarsgo一样,它是作为由java开发的dubbo重量级下的框架,2019年提出
在github上有4.7k:https://github.com/apache/dubbo-go -
go-kit:在特点上它可以说是一个工具集,包含了微服务架构的功能封装集合,在2015年开源的项目,主要缺点是社区建设不足。
在github上有26.3K:https://github.com/go-kit/kit -
go-kratos:在2019年提出,整体上是一个轻量级的微服务框架,由B站开源的项目,设计的理论是将框架整体打造为微服务工具集,可以个性化的使用定制,社区维护上有公众号和微信群解答
在github上有22.6k:https://github.com/go-kratos/kratos -
go-zero:在2020年开源,属于后起之秀,但目前在github上已有25.2k, 在近几年同比与其他微服务框架发展迅速,框架吸收众多主流技术方案以及充分的实践,可以快速构建一个项目应用,有较多的类库工具包。
go-zero已经是CNCF项目,在社区生态上比较完善有,有在微信群解答、公众号及多渠道的直播或文章指导。在github有28k:https://github.com/zeromicro/go-zero
下载并认识go-zero
认识go-zero
https://go-zero.dev/docs/tasks
go-zero是go语言中微服务框架的后起之秀,吸收众多框架的特点及项目开发的经验设计的框架,在近几年的发展是非常快的。
在框架的整体设计思想上是使开发者快速开发项目应用,并专注在业务的实现上,同时也尽可能少的代码编码写。
框架功能上提供了丰富的工具支持,极简的接口,在服务内部默认就基于etcd实现了服务注册发现,自动降级,自动熔断,自动缓存控制等功能。
同时go-zero除框架本身及工具外,还提供了对应生成工具goctl,通过它根据定义的api/rpc文件即可快速构建一个服务,同时还可以生成其他语言相关的代码并且可以直接运行。
环境要求
- go:1.16版本以上;
- 支持windows、mac、linux
- protoc与grpc的安装
- redis/etcd
为什么还需要安装redis/etcd?
因为go-zero默认框架就集成了,框架在进行数据库读写操作的时候使用redis作缓存,并且默认则使用etcd做服务发现于注册。
可以不用么?可以,但是在目前业界的项目中redis作为缓存已经是大多数项目的普遍现象,而在微服务项目中我们也可以依据go-zero的内部机制选择其他的服务发现机制。
组成
goctl:是go-zero配套的代码生成工具,理论上可以不用,但是它给我提供的命令可以帮助我们减少开发时间
go-zero:是框架整体及程序运行所需要依赖的框架核心代码。
下载
# 下载goctl
go install github.com/zeromicro/go-zero/tools/goctl@latest# 下载go-zero
go get -u github.com/zeromicro/go-zero@latest
实践go-zero基础功能案例+api+rpc服务
功能说明
我们将通过go-zero分别构建一个rpc服务提供业务需求,再构建一个api服务对外访问。
那么我们该如何开始呢,在上一节中介绍过,go-zero可以基于api/proto文件通过goctl快速的帮助我们构建一个服务。
因此我们的实现过程就是
- 先编写.proto文件
- 构建rcp服务
- 编写api文件
- 构建api服务
- 服务的对接联调
准备
在项目的开始前,我们可以创建一个user目录,在user目录下创建一个rpc/api目录分别存放对应的服务
构建rpc服务
在goctl命令中可以通过如下命令构建一个rpc服务
goctl rpc new user
然后只需要再执行go mod tidy下载好相应的包及构建完了一个服务,如下是构建好的服务目录
- user ---------- 服务- etc ---------- 配置文件- internal -------- 内核- config ------- 配置- logic -------- 逻辑处理层- server ------- 对外rpc服务对象- svc ---------- 服务上下文对象- user ---------- protoc于grpc工具库- userclient ------ 封装的对当前服务调度的client
并且目前我们就可以通过go run .
启动服务了。
可以利用apipost访问刚刚启动的grpc服务,goctl也可以直接根据.proto文件构建一个rpc服务。
我们以user.proto文件为例演示。
syntax = "proto3"; // 指定proto版本// 指定golang包名
option go_package = "./user";service User {rpc GetUser(GetUserReq) returns (GetUserResp) {}
}message GetUserReq {string id = 1;
}message GetUserResp {string id = 1;string name = 2;string phone = 3;
}
通过命令进入到于user.proto同级的目录下,然后执行如下命令即可生成rpc服务。
goctl rpc protoc user.proto --go_out=. --go-grpc_out=. --zrpc_out=.
别忘了执行go mod tidy
下载所需要的包。
然后我们再到打开internal/logic/getuserlogic.go
中完善一下业务需求。
package logicimport ("context""demo/user/rpc/internal/svc""demo/user/rpc/user""github.com/zeromicro/go-zero/core/logx"
)type GetUserLogic struct {ctx context.ContextsvcCtx *svc.ServiceContextlogx.Logger
}func NewGetUserLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetUserLogic {return &GetUserLogic{ctx: ctx,svcCtx: svcCtx,Logger: logx.WithContext(ctx),}
}func (l *GetUserLogic) GetUser(in *user.GetUserReq) (*user.GetUserResp, error) {return &user.GetUserResp{Id: in.Id,Name: "test",Phone: "12345678901",}, nil
}
再请求测试。
如果我们的.proto需要新增新的方法怎么做?比如新增一个ping的方法
option go_package = "./user";service User {rpc GetUser(GetUserReq) returns (GetUserResp) {}rpc Ping(Request) returns(Response) {}
}message Request{}message Response{}
实际上我们只需要重新再执行一次之前的命令即可,goctl会自动加载最新的方法和属性并生成新的代码 ,对已有的属性和方法是不会做改变和调整。
构建api服务
go-zero对api接口的开发实践经验进行了总结,也提出于.protoc一样的方式,基于文件约定服务的接口的实现。
构建api服务的方式也可以于rpc服务一样,在没有服务文件的时候通过new创建,以下就是具体的命令
goctl api new api
接下来我们直接以.api文件的方式来创建。关于api的语法于go或者protobuf的语法是相似的,可以看看文档:https://go-zero.dev/docs/tasks/dsl/api
syntax = "v1"type (UserReq {Id string `json:"id"`}UserResp {Id string `json:"id"`Name string `json:"name"`Phone string `json:"phone"`}
)service User {@handler GetUserget /user (UserReq) returns (UserResp)
}
执行如下命令
goctl api go -api user.api -dir . -style gozero
构建的目录结构与rpc服务的目录结构是类似的,同样我们也可以做一个简单的测试。
服务之间的调度
接下来我们用api服务调用rpc服务,将服务的信息数据返回给用户。在连接的时候我们需要先配置api的配置文件,使等api服务可以找到目标rpc服务。
UserRPC:Etcd:Hosts:- 127.0.0.1:2379Key: user.rpc
其中key是哪里来的?它来自user/rpc/etc/xx.yaml
中定义的。还需要修改config.go中的结构体对象。
package configimport ("github.com/zeromicro/go-zero/rest""github.com/zeromicro/go-zero/zrpc"
)type Config struct {rest.RestConfUserRPC zrpc.RpcClientConf
}
在api/internal/svc中初始化
package svcimport ("demo/user/api/internal/config""demo/user/rpc/userclient""github.com/zeromicro/go-zero/zrpc"
)type ServiceContext struct {Config config.ConfigUserClient userclient.User
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config: c,UserClient: userclient.NewUser(zrpc.MustNewClient(c.UserRpc)),}
}
然后修改业务的代码
package logicimport ("context""demo/user/rpc/userclient""demo/user/api/internal/svc""demo/user/api/internal/types""github.com/zeromicro/go-zero/core/logx"
)type GetUserLogic struct {logx.Loggerctx context.ContextsvcCtx *svc.ServiceContext
}func NewGetUserLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetUserLogic {return &GetUserLogic{Logger: logx.WithContext(ctx),ctx: ctx,svcCtx: svcCtx,}
}func (l *GetUserLogic) GetUser(req *types.UserReq) (resp *types.UserResp, err error) {getUserResp, err := l.svcCtx.UserClient.GetUser(l.ctx, &userclient.GetUserReq{Id: req.Id,})if err != nil {return}return &types.UserResp{Id: getUserResp.Id,Name: getUserResp.Name,Phone: getUserResp.Phone,}, nil
}
启动项目的时候先启动rpc,因为api启动的时候如果说rpc服务不存在会抛出异常终止运行,最后再请求测试。
实践go-zero基础功能案例-数据库读写与中间件
数据库
go-zero对数据库的操作也是非常简单,goctl工具可以根据数据库的sql命令构建好模型,并且为模型提供基础的CURD操作。
如下:是一个user表
CREATE TABLE `users` (`id` varchar(24) COLLATE utf8mb4_unicode_ci NOT NULL ,`avatar` varchar(191) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',`name` varchar(24) COLLATE utf8mb4_unicode_ci NOT NULL,`phone` varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL,`password` varchar(191) COLLATE utf8mb4_unicode_ci DEFAULT NULL,`status` int(10) COLLATE utf8mb4_unicode_ci DEFAULT NULL,`created_at` timestamp NULL DEFAULT NULL,`updated_at` timestamp NULL DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
执行如下命令构建model
goctl model mysql ddl -src="./*.sql" -dir="." -c
在命令执行之后会创建usermodel.go,usermodel_gen.go,vars.go
三个文件,在文件内部包含了对数据库基础curd操作,需注意默认使用的数据库orm是go-zero封装的xsql而非gorm。
usersModel interface {Insert(ctx context.Context, data *Users) (sql.Result, error)FindOne(ctx context.Context, id string) (*Users, error)Update(ctx context.Context, data *Users) errorDelete(ctx context.Context, id string) error}
当我们需要新增一个字段的时候调整sql,再执行同样的命令即可,goctl会自动加载更改变化的字段,但注意goctl不会自定义的方法,但是会影响到基础的CURD方法中的内容。另外在命令中默认提供模型是使用了redis做缓存的,如果期望生成的默认模型不使用redis做缓存可以执行如下的命令
goctl model mysql ddl -src="./*.sql" -dir="." -c=false
数据库配置
Mysql:Datasource: root:123456@tcp(127.0.0.1:3306)/demo?charset=utf8mb4&parseTime=True&loc=LocalCache:- Host: 127.0.0.1:6379Type: nodePass:
//config.go
package configimport ("github.com/zeromicro/go-zero/core/stores/cache""github.com/zeromicro/go-zero/zrpc"
)type Config struct {zrpc.RpcServerConfMysql struct {DataSource string}Cache cache.CacheConf
}
在配置中除了配置数据库的连接配置信息外,还需要配置redis缓存的连接配置信息,因为在默认提供的orm中就有使用到redis做缓存,修改服务与业务
新增
先新增创建用户服务
syntax = "proto3"; // 指定proto版本// 指定golang包名
option go_package = "./user";message GetUserReq {string id = 1;
}message GetUserResp {string id = 1;string name = 2;string phone = 3;
}message CreateUserReq {string id = 1;string name = 2;string phone = 3;
}message CreateUserResp {string id = 1;string name = 2;string phone = 3;}service User {rpc GetUser(GetUserReq) returns (GetUserResp) {}rpc CreateUser(CreateUserReq) returns(CreateUserResp) {}
}
在服务核心对象中引用
package svcimport ("demo/user/models""demo/user/rpc/internal/config""github.com/zeromicro/go-zero/core/stores/sqlx"
)type ServiceContext struct {Config config.ConfigUserModel models.UsersModel
}func NewServiceContext(c config.Config) *ServiceContext {sqlConn := sqlx.NewMysql(c.Mysql.DataSource)return &ServiceContext{Config: c,UserModel: models.NewUsersModel(sqlConn, c.Cache),}
}
实现具体的业务
package logicimport ("context""demo/user/models""demo/user/rpc/internal/svc""demo/user/rpc/user""github.com/zeromicro/go-zero/core/logx"
)type CreateUserLogic struct {ctx context.ContextsvcCtx *svc.ServiceContextlogx.Logger
}func NewCreateUserLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateUserLogic {return &CreateUserLogic{ctx: ctx,svcCtx: svcCtx,Logger: logx.WithContext(ctx),}
}func (l *CreateUserLogic) CreateUser(in *user.CreateUserReq) (*user.CreateUserResp, error) {_, err := l.svcCtx.UserModel.Insert(l.ctx, &models.Users{Id: in.Id,Name: in.Name,Phone: in.Phone,})if err != nil {return nil, nil}return &user.CreateUserResp{Id: in.Id,Name: in.Name,Phone: in.Phone,}, nil
}
测试因为在api中没有提供对user新增的接口,我们在apipost中通过grpc新增,新增之后通过api接口查询新增的用户,这基本就是go-zero的数据操作。
中间件
接下来我们来看看中间件,通过中间件可以很好的实现面向切面的编程,比如在当前的业务中,我们需要对api接口访问的时候要求需要携带固定的token才认为是可以访问的。
这个时候我们可以新增一个验证的中间件,修改user.api
@server (middleware: LoginVerification
)
service User {@handler GetUserInfoget /userinfo (UserReq) returns (UserResp)
}
你只需要在service上定义好@server则就表示该service中的api都使用该中间件,如果不使用则重新定义一个新的service并存储好它的api接口即可。
执行命令
goctl api go -api user.api -dir . -style gozero
就会在同级目录下生成middleware,在目录生成了LoginVerification中间件,并为创建的api提供了中间件的引用。
// Code generated by goctl. DO NOT EDIT.
package handlerimport ("net/http""demo/user/api/internal/svc""github.com/zeromicro/go-zero/rest"
)func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {server.AddRoutes([]rest.Route{{Method: http.MethodGet,Path: "/user",Handler: GetUserHandler(serverCtx),},},)server.AddRoutes(rest.WithMiddlewares([]rest.Middleware{serverCtx.LoginVerification},[]rest.Route{{Method: http.MethodGet,Path: "/userinfo",Handler: GetUserInfoHandler(serverCtx),},}...,),)
}
我们只需要在api的服务核心对象中实例即可
package svcimport ("demo/user/api/internal/config""demo/user/api/internal/middleware""demo/user/rpc/userclient""github.com/zeromicro/go-zero/rest""github.com/zeromicro/go-zero/zrpc"
)type ServiceContext struct {Config config.ConfigUserClient userclient.UserLoginVerification rest.Middleware
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config: c,UserClient: userclient.NewUser(zrpc.MustNewClient(c.UserRpc)),LoginVerification: middleware.NewLoginVerificationMiddleware().Handle,}
}
再完善api的中间件内容
package middlewareimport "net/http"type LoginVerificationMiddleware struct {
}func NewLoginVerificationMiddleware() *LoginVerificationMiddleware {return &LoginVerificationMiddleware{}
}func (m *LoginVerificationMiddleware) Handle(next http.HandlerFunc) http.HandlerFunc {return func(w http.ResponseWriter, r *http.Request) {if r.Header.Get("token") == "123456" {next(w, r)return}w.Write([]byte("权限不足无法执行"))}
}
探究go-zero是如何基于grpc进一步扩展
为什么还要在grpc上扩展?
grpc主要在于解决微服务领域中rpc通信的标准策略,针对rpc的通信方式、协议编码、请求阐述等做统一的规范约定,这样在跨语言跨项目的时候双方之间的通信也会轻松。
而目前主流框架在使用grpc的时候在其基础上进行扩展和增强功能如下几点:
- 需要自定义的功能:gRPC提供了一些常见的功能,如请求-响应模式、流式传输等。但是,如果你需要实现一些特定的功能,如认证、授权、日志记录、监控等,可能需要在gRPC的基础上进行扩展开发。
- 需要与其他技术栈结合:虽然gRPC在Go语言中非常流行,但在某些情况下,你可能需要与其他技术栈进行结合,如数据库、消息队列、缓存等。在这种情况下需要编写自定义的代码来与这些技术栈进行交互。
- 需要适应特定的业务需求:每个项目都有不同的业务需求。尽管gRPC提供了很多功能,但你可能需要根据你的业务需求进行定制化开发,以满足特定的需求。
- 需要性能优化:尽管gRPC在性能方面表现出色,但在某些场景下可能需要进行性能优化。你可能需要对请求-响应模式、流式传输等进行优化,以提高系统的性能和吞吐量。
尽管gRPC是一个强大的框架,但在特定的业务需求下,需要在其基础上进行扩展开发或构建新的项目。这样可以满足特定的需求,提高系统的性能和灵活性。
go-zero的目录结构
- go-zero- core- internal- zrpc
go-zero中的目录结构里主要包含core、internal、zrpc。
- core: 包含go-zero框架整体的核心类库工具包,如redis、MySQL、熔断器等机制
- internal:是针对整个框架定义的内部处理方式,包含服务的处理实例、请求编码、健康检测、分析。
- zrpc:是rpc服务端与客户端的调用连接入口,封装了关于服务端和客户端的操作。
go-zero如何适配grpc
目前已知go-zero是使用了grpc作为底层rpc的调度机制,因此在服务的定义与请求调度上是需要与grpc的方式适配的。
go-zero的解决方案是在grpc的类库实例对象上做一层适配的调度封装进行解决,在使用goctl命令的时候它会自动为我们创建好这一层封装。
- apps - user- rpc- internal- server # 封装好的服务适配方式- userclient # rpc客户端调度的适配方式
在go-zero的入口文件中,先创建好整个服务系统所使用的服务核心对象,然后再创建zrpc的服务对象并对整个程序启动。
var configFile = flag.String("f", "etc/local/user.yaml", "the config file")func main() {flag.Parse()var c config.Configconf.MustLoad(*configFile, &c)ctx := svc.NewServiceContext(c)s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {user.RegisterUserServe(grpcServer, server.NewUserServe(ctx))if c.Mode == service.DevMode || c.Mode == service.TestMode {reflection.Register(grpcServer)}})defer s.Stop()fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)s.Start()
}
而在zrpc.MustNewServer
中第一个参数是注册好服务的配置信息,而第二个参数则是完成对grpc服务的注册,参数类型是方法类型,在类型中要求传递grpcServer对象。
user.RegisterUserServe(grpcServer, server.NewUserServe(ctx))
结合如上的代码即可完成grpc的服务注册。
服务初始化
我们再看看服务的初始化
type RpcServer struct {server internal.Serverregister internal.RegisterFn
}
la
// MustNewServer returns a RpcSever, exits on any error.
func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {server, err := NewServer(c, register)logx.Must(err)return server
}// NewServer returns a RpcServer.
func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error) {var err errorif err = c.Validate(); err != nil {return nil, err}// 记录服务的各项指标工具var server internal.Servermetrics := stat.NewMetrics(c.ListenOn)serverOptions := []internal.ServerOption{internal.WithMetrics(metrics),internal.WithRpcHealth(c.Health),}if c.HasEtcd() {server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, c.Middlewares, serverOptions...)if err != nil {return nil, err}} else {server = internal.NewRpcServer(c.ListenOn, c.Middlewares, serverOptions...)}server.SetName(c.Name)if err = setupInterceptors(server, c, metrics); err != nil {return nil, err}rpcServer := &RpcServer{server: server,register: register,}if err = c.SetUp(); err != nil {return nil, err}return rpcServer, nil
}
在服务的初始化中并不复杂,主要做了下面的事情
- 先创建一个用于记录整个系统运行中各项指标的工具
- 验证服务是否默认使用的是etcd作为服务发现机制,以此创建相应的rpc服务对象
- 并设置好服务名及指标记录的拦截器
- 设置服务的信息其中包含prometheus/trace等监听。
我们对代码往下看在RpcServer中还提供了关于grpc请求信息及拦截器的设置
// AddOptions adds given options.
func (rs *RpcServer) AddOptions(options ...grpc.ServerOption) {rs.server.AddOptions(options...)
}// AddStreamInterceptors adds given stream interceptors.
func (rs *RpcServer) AddStreamInterceptors(interceptors ...grpc.StreamServerInterceptor) {rs.server.AddStreamInterceptors(interceptors...)
}// AddUnaryInterceptors adds given unary interceptors.
func (rs *RpcServer) AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) {rs.server.AddUnaryInterceptors(interceptors...)
}
rpcserver
对server = internal.NewRpcServer(c.ListenOn, c.Middlewares, serverOptions...)
继续往下分析。
func NewRpcServer(addr string, middlewares ServerMiddlewaresConf, opts ...ServerOption) Server {var options rpcServerOptionsfor _, opt := range opts {opt(&options)}if options.metrics == nil {options.metrics = stat.NewMetrics(addr)}return &rpcServer{baseRpcServer: newBaseRpcServer(addr, &options),middlewares: middlewares,healthManager: health.NewHealthManager(fmt.Sprintf("%s-%s", probeNamePrefix, addr)),}
}// rpc/internal/server.gp
type (// RegisterFn defines the method to register a server.RegisterFn func(*grpc.Server)// Server interface represents a rpc server.Server interface {AddOptions(options ...grpc.ServerOption)AddStreamInterceptors(interceptors ...grpc.StreamServerInterceptor)AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor)SetName(string)Start(register RegisterFn) error}baseRpcServer struct {address stringhealth *health.Servermetrics *stat.Metricsoptions []grpc.ServerOptionstreamInterceptors []grpc.StreamServerInterceptorunaryInterceptors []grpc.UnaryServerInterceptor}
)func newBaseRpcServer(address string, rpcServerOpts *rpcServerOptions) *baseRpcServer {var h *health.Serverif rpcServerOpts.health {h = health.NewServer()}return &baseRpcServer{address: address,health: h,metrics: rpcServerOpts.metrics,options: []grpc.ServerOption{grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionIdle: defaultConnectionIdleDuration,})},}
}
可以看到的是最终构建并返回的实际rpc调度的服务对象,在内容中设置好基础rpcServer,中间件,监控。
rpcpubserver
func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, middlewares ServerMiddlewaresConf,opts ...ServerOption) (Server, error) {registerEtcd := func() error {pubListenOn := figureOutListenOn(listenOn)var pubOpts []discov.PubOptionif etcd.HasAccount() {pubOpts = append(pubOpts, discov.WithPubEtcdAccount(etcd.User, etcd.Pass))}if etcd.HasTLS() {pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,etcd.CACertFile, etcd.InsecureSkipVerify))}if etcd.HasID() {pubOpts = append(pubOpts, discov.WithId(etcd.ID))}pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)return pubClient.KeepAlive()}server := keepAliveServer{registerEtcd: registerEtcd,Server: NewRpcServer(listenOn, middlewares, opts...),}return server, nil
}type keepAliveServer struct {registerEtcd func() errorServer
}func (s keepAliveServer) Start(fn RegisterFn) error {if err := s.registerEtcd(); err != nil {return err}return s.Server.Start(fn)
}func figureOutListenOn(listenOn string) string {fields := strings.Split(listenOn, ":")if len(fields) == 0 {return listenOn}host := fields[0]if len(host) > 0 && host != allEths {return listenOn}ip := os.Getenv(envPodIp)if len(ip) == 0 {ip = netx.InternalIp()}if len(ip) == 0 {return listenOn}return strings.Join(append([]string{ip}, fields[1:]...), ":")
}
在rpcPubServer中,则会基于etcd做注册中心,并且向etcd中注册了服务对象
服务启动
关于服务的启动基于代码的跟踪,主要核心的代码即在zrpc/internal/rpcserver.go
中。
func (s *rpcServer) Start(register RegisterFn) error {lis, err := net.Listen("tcp", s.address)if err != nil {return err}unaryInterceptorOption := grpc.ChainUnaryInterceptor(s.buildUnaryInterceptors()...)streamInterceptorOption := grpc.ChainStreamInterceptor(s.buildStreamInterceptors()...)options := append(s.options, unaryInterceptorOption, streamInterceptorOption)server := grpc.NewServer(options...)register(server)// register the health check serviceif s.health != nil {grpc_health_v1.RegisterHealthServer(server, s.health)s.health.Resume()}s.healthManager.MarkReady()health.AddProbe(s.healthManager)// we need to make sure all others are wrapped up,// so we do graceful stop at shutdown phase instead of wrap up phasewaitForCalled := proc.AddWrapUpListener(func() {if s.health != nil {s.health.Shutdown()}server.GracefulStop()})defer waitForCalled()return server.Serve(lis)
}
整体代码在启动的过程中并不复杂,程序中默认使用tcp作为rpc的基础通信协议,然后设置好grpc的拦截器并创建好grpc的server对象,顾在这里可以看出go-zero是在start的时候才去进行监听创建grpc服务。
在服务创建后就执行register方法,该方法在前面入口的时候就已看到了具体传递的参数内容
s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {user.RegisterUserServeServer(grpcServer, server.NewUserServeServer(ctx))if c.Mode == service.DevMode || c.Mode == service.TestMode {reflection.Register(grpcServer)}
})
在后续的内容中设置好对应的健康检测的机制,加载服务停止或非正常情况下的停止机制,然后启动服务。
小结
通过对这段内容的了解,基本清楚go-zero是如何与grpc的结合,在服务和调度的时候是对grpc的方式进行了一层适配,而因使用goctl的关系对用户在开发的时候是无感知的。
而在go-zero的服务初始化中及就默认使用etcd作为服务的注册发现机制,如果没有设置会以普通的rpc服务去启动而没有做注册动作,在go-zero调用start的时候最后才去创建出grpc并运行。
由此:如果我们需要使用consol作为服务注册中心的话,需要自己在启动入口进行注册。
官方为我们提供了对应包可以看这里 http://github.com/zeromicro/zero-contrib
初步分析
在上一节中已有对go-zero进行分析,了解到go-zero是基于grpc实现的,而在rpc的服务端和客户端的处理上是额外进行一次封装调度适配。
type defaultUser struct {cli zrpc.Client
}func NewUser(cli zrpc.Client) User {return &defaultUser{cli: cli,}
}func (m *defaultUser) Ping(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) {client := user.NewUserClient(m.cli.Conn())return client.Ping(ctx, in, opts...)
}
在go-zero的封装中,会要求传递一个zrpc.client对象,并且在使用中是通过user.NewUserClient(m.cli.Conn())
创建出一个客户端,客户端的连接是通过zrpc.Client中的Conn方法获取的。
而zrpc.Client是在创建核心服务中心对象中创建并传递引用。
type ServiceContext struct {Config config.ConfigUserRpc userclient.User
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config: c,UserRpc: userclient.NewUser(zrpc.MustNewClient(c.UserRpc)),}
}
在整体框架的设计上,对于rpc的通信整体机制的运行处理仍然还是基于grpc完成,而go-zero显然是基于grpc的机制进行了额外的扩展。
初始化client
但实际上在微服务请求调度中,client的实现同比与server会复杂一些,因为client除了在调度外还需考虑服务负载均衡、请求熔断、超时等处理。
func MustNewClient(c RpcClientConf, options ...ClientOption) Client {cli, err := NewClient(c, options...)logx.Must(err)return cli
}// NewClient returns a Client.
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {var opts []ClientOptionif c.HasCredential() {opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{App: c.App,Token: c.Token,})))}if c.NonBlock {opts = append(opts, WithNonBlock())}if c.Timeout > 0 {opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))}if c.KeepaliveTime > 0 {opts = append(opts, WithDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: c.KeepaliveTime,})))}opts = append(opts, options...)target, err := c.BuildTarget()if err != nil {return nil, err}client, err := internal.NewClient(target, c.Middlewares, opts...)if err != nil {return nil, err}return &RpcClient{client: client,}, nil
}
在初始化的内容中主要事项是
- 设置tts认证
- 是否阻塞
- 超时及长连接时间
- 再获取要构建的目标【默认解析的是etcd】
- 创建具体的客户端实例对象
func NewClient(target string, middlewares ClientMiddlewaresConf, opts ...ClientOption) (Client, error) {cli := client{middlewares: middlewares,}svcCfg := fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, p2c.Name)balancerOpt := WithDialOption(grpc.WithDefaultServiceConfig(svcCfg))opts = append([]ClientOption{balancerOpt}, opts...)if err := cli.dial(target, opts...); err != nil {return nil, err}return &cli, nil
}func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption {var cliOpts ClientOptionsfor _, opt := range opts {opt(&cliOpts)}var options []grpc.DialOptionif !cliOpts.Secure {options = append([]grpc.DialOption(nil),grpc.WithTransportCredentials(insecure.NewCredentials()))}if !cliOpts.NonBlock {options = append(options, grpc.WithBlock())}options = append(options,grpc.WithChainUnaryInterceptor(c.buildUnaryInterceptors(cliOpts.Timeout)...),grpc.WithChainStreamInterceptor(c.buildStreamInterceptors()...),)return append(options, cliOpts.DialOptions...)
}func (c *client) buildStreamInterceptors() []grpc.StreamClientInterceptor {var interceptors []grpc.StreamClientInterceptorif c.middlewares.Trace {interceptors = append(interceptors, clientinterceptors.StreamTracingInterceptor)}return interceptors
}func (c *client) buildUnaryInterceptors(timeout time.Duration) []grpc.UnaryClientInterceptor {var interceptors []grpc.UnaryClientInterceptorif c.middlewares.Trace {interceptors = append(interceptors, clientinterceptors.UnaryTracingInterceptor)}if c.middlewares.Duration {interceptors = append(interceptors, clientinterceptors.DurationInterceptor)}if c.middlewares.Prometheus {interceptors = append(interceptors, clientinterceptors.PrometheusInterceptor)}if c.middlewares.Breaker {interceptors = append(interceptors, clientinterceptors.BreakerInterceptor)}if c.middlewares.Timeout {interceptors = append(interceptors, clientinterceptors.TimeoutInterceptor(timeout))}return interceptors
}func (c *client) dial(server string, opts ...ClientOption) error {options := c.buildDialOptions(opts...)timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)defer cancel()conn, err := grpc.DialContext(timeCtx, server, options...)if err != nil {service := serverif errors.Is(err, context.DeadlineExceeded) {pos := strings.LastIndexByte(server, separator)// len(server) - 1 is the index of last charif 0 < pos && pos < len(server)-1 {service = server[pos+1:]}}return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is already started",server, err.Error(), service)}c.conn = connreturn nil
}
通过代码可以直观的了解到,在创建客户端的时候先是设置好负载均衡机制、然后再基于grpc的拦截器设置好对请求处理的分析、超时、熔断器、监听等机制,并且在go-zero中这些事项是默认会开启的。
ClientMiddlewaresConf struct {Trace bool `json:",default=true"`Duration bool `json:",default=true"`Prometheus bool `json:",default=true"`Breaker bool `json:",default=true"`Timeout bool `json:",default=true"`}
在程序的后续go-zero通过调用grpc中的DialContext方法创建出grpc的clientConn对象,赋值于属性conn
负载均衡机制是如何初始化的
分析到这里,实际上我们会存在一个问题,就是负载均衡机制到底是如何初始化的,在整个
/zrpc/internal/client.go
中只是调用了p2c.Name。理论上这是一个字符串,不具备程序处理功能,并且在全局中也不见对p2c的其他处理操作,那go-zero又是如何完成负载均衡机制机制的初始化呢?
func NewClient(target string, middlewares ClientMiddlewaresConf, opts ...ClientOption) (Client, error) {cli := client{middlewares: middlewares,}svcCfg := fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, p2c.Name)balancerOpt := WithDialOption(grpc.WithDefaultServiceConfig(svcCfg))opts = append([]ClientOption{balancerOpt}, opts...)if err := cli.dial(target, opts...); err != nil {return nil, err}return &cli, nil
}
这种场景在阅读源码的时候会经常出现,及一段代码在方法中只是简约调用但无具体的实例化过程。针对这样的情况在程序中可以从三个方向找。
- 看当前方法调用的目录是否存在init方法进行初始化
- 在当前方法的同级目录下是否存在init初始化
- 调用类库中是否有init对属性初始化
在当前的代码示例中,负载均衡的注册主要是通过p2c类库中init方法完成注册的
const (// Name is the name of p2c balancer.Name = "p2c_ewma"
)func init() {balancer.Register(newBuilder())
}func newBuilder() balancer.Builder {return base.NewBalancerBuilder(Name, new(p2cPickerBuilder), base.Config{HealthCheck: true})
}
小结
到此基本上我们针对go-zero的client内部已经做了解
- go-zero是基于grpc的内部机制基础上进行扩展
- 在初始化客户端的时候会默认注册号熔断器、超时等处理机制
关于go-zero中的中间件功能是如何实现的
目录结构
go-zero的api服务主要是在go-zero/rest
包中封装并处理
rest- handler // 系统中间件- httpx // 针对请求响应的封装- internal // 内核:跨域处理、编码、错误信息等- pathvar // path 参数解析- router // 路由- token // token解析验证,目前主要是jwttoken的解析
初始化
在api服务的入口文件中,我们很容易了解到go-zero的加载过程,先加载配置,再创建对应的服务、再注册路由并启动服务。
func main() {flag.Parse()var c config.Configconf.MustLoad(*configFile, &c)server := rest.MustNewServer(c.RestConf)defer server.Stop()ctx := svc.NewServiceContext(c)handler.RegisterHandlers(server, ctx)fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)server.Start()
}
在初始化中主要是加载对api服务引擎,然后再加载路由处理对象,所处理的事情不复杂,同时go-zero提供了RunOption操作用于对Server进行设置
func NewServer(c RestConf, opts ...RunOption) (*Server, error) {if err := c.SetUp(); err != nil {return nil, err}server := &Server{ngin: newEngine(c),router: router.NewRouter(),}opts = append([]RunOption{WithNotFoundHandler(nil)}, opts...)for _, opt := range opts {opt(server)}return server, nil
}
需注意:在此时注册的router对象是/go-zero/rest/router/patrouter
启动流程
路由的加载主要驱动是goctl生成的handler.RegisterHandlers
函数方法, 该方法中会加载需注册的路由、绑定中间件。
func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {server.AddRoutes([]rest.Route{{Method: http.MethodPost,Path: "/register",Handler: user.RegisterHandler(serverCtx),},},// ..)
}
实际上在上面的处理是将路由与引擎绑定,而非和真正处理请求的实例对象router关联。
func (s *Server) AddRoutes(rs []Route, opts ...RouteOption) {r := featuredRoutes{routes: rs,}for _, opt := range opts {opt(&r)}s.ngin.addRoutes(r)
}
具体的绑定和启动主要是在server.Start()
启动的时候触发
func (ng *engine) start(router httpx.Router, opts ...StartOption) error {if err := ng.bindRoutes(router); err != nil {return err}// make sure user defined options overwrite default optionsopts = append([]StartOption{ng.withTimeout()}, opts...)if len(ng.conf.CertFile) == 0 && len(ng.conf.KeyFile) == 0 {return internal.StartHttp(ng.conf.Host, ng.conf.Port, router, opts...)}// make sure user defined options overwrite default optionsopts = append([]StartOption{func(svr *http.Server) {if ng.tlsConfig != nil {svr.TLSConfig = ng.tlsConfig}},}, opts...)return internal.StartHttps(ng.conf.Host, ng.conf.Port, ng.conf.CertFile,ng.conf.KeyFile, router, opts...)
}func StartHttp(host string, port int, handler http.Handler, opts ...StartOption) error {return start(host, port, handler, func(svr *http.Server) error {return svr.ListenAndServe()}, opts...)
}
通过对代码的跟踪可以看到在engine中默认会给api服务增加中间件的设置,
func (ng *engine) appendAuthHandler(fr featuredRoutes, chn chain.Chain,verifier func(chain.Chain) chain.Chain) chain.Chain {if fr.jwt.enabled {if len(fr.jwt.prevSecret) == 0 {chn = chn.Append(handler.Authorize(fr.jwt.secret,handler.WithUnauthorizedCallback(ng.unauthorizedCallback)))} else {chn = chn.Append(handler.Authorize(fr.jwt.secret,handler.WithPrevSecret(fr.jwt.prevSecret),handler.WithUnauthorizedCallback(ng.unauthorizedCallback)))}}return verifier(chn)
}func (ng *engine) bindRoute(fr featuredRoutes, router httpx.Router, metrics *stat.Metrics,route Route, verifier func(chain.Chain) chain.Chain) error {chn := ng.chainif chn == nil {chn = ng.buildChainWithNativeMiddlewares(fr, route, metrics)}chn = ng.appendAuthHandler(fr, chn, verifier)for _, middleware := range ng.middlewares {chn = chn.Append(convertMiddleware(middleware))}handle := chn.ThenFunc(route.Handler)return router.Handle(route.Method, route.Path, handle)
}
加载的中间件就有jwt、trace、日志、监听、熔断器、超时等相关中间件,再通过chn.ThenFunc加载所有的中间件, 在实现的方式中,是使中间件从最后一个开始往前嵌套。
func (c chain) Then(h http.Handler) http.Handler {if h == nil {h = http.DefaultServeMux}for i := range c.middlewares {h = c.middlewares[len(c.middlewares)-1-i](h)}return h
}
在程序的代码往后的跟踪中,在/go-zero/rest/router/patrouter
中就即可看到路由的存储,及最终服务的驱动
type (node struct {item anychildren [2]map[string]*node}// A Tree is a search tree.Tree struct {root *node}
)func (pr *patRouter) Handle(method, reqPath string, handler http.Handler) error {if !validMethod(method) {return ErrInvalidMethod}if len(reqPath) == 0 || reqPath[0] != '/' {return ErrInvalidPath}cleanPath := path.Clean(reqPath)tree, ok := pr.trees[method]if ok {return tree.Add(cleanPath, handler)}tree = search.NewTree()pr.trees[method] = treereturn tree.Add(cleanPath, handler)
}
go-zero的路由存储上是采用了树的方式存储,在业界中对路由的存储方式主要有两种
- 是基于树
- 是基于字典
两种方式相对而言字典速度是最快的,但是它会存储较多重复的内容;如
Post /v1/user/login
Post /v1/user/register
上面两个路由,路由的前缀实际上是相同的,go-zero的方式非传统二叉树,而是树和字典的结合如下是存储的示意图
在存储的时候子集存储的存储方式采用的是map[string]*node方式存储,在性能上是o(log2)同比与字段会稍慢,但可以较少较多的存储空间。综合居中。
如果你对go-zero具体的存储有兴趣,可以在代码中增加如下代码打印路由树看看
/go-zero/core/search/tree.go
func (t *Tree) Iteration() {t.root.iteration("root")
}func (n *node) iteration(path string) {if n == nil {return}fmt.Printf("%s : %v \n", path, n.item)for i, child := range n.children {path1 := fmt.Sprintf("%s - %v", path, i)for s, n2 := range child {path2 := fmt.Sprintf("%s - %v", path1, s)n2.iteration(path2)}}
}
/go-zero/rest/router/patrouter
func (pr *patRouter) ServeHTTP(w http.ResponseWriter, r *http.Request) {reqPath := path.Clean(r.URL.Path)for s, tree := range pr.trees {fmt.Println("----- s ----------- ", s)tree.Iteration()}if tree, ok := pr.trees[r.Method]; ok {if result, ok := tree.Search(reqPath); ok {if len(result.Params) > 0 {r = pathvar.WithVars(r, result.Params)}result.Item.(http.Handler).ServeHTTP(w, r)return}}allows, ok := pr.methodsAllowed(r.Method, reqPath)if !ok {pr.handleNotFound(w, r)return}if pr.notAllowed != nil {pr.notAllowed.ServeHTTP(w, r)} else {w.Header().Set(allowHeader, allows)w.WriteHeader(http.StatusMethodNotAllowed)}
}
总结
如下就是具体api服务与中间件及路由的加载流程