syntax = "proto3"; // 这是个proto3的文件message HelloRequest{ // 创建数据对象string name = 1; // name表示名称,编号是1
}
生成python文件
安装grpcio和grpcio-tools库
pip install grpcio #安装grpc
pip install grpcio-tools #安装grpc tools
生成proto的python文件
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. helloworld.proto
python -m grpc_tools.protoc
:使用grpc_tools包中的protoc命令进行代码生成。--python_out=.
:指定生成的Python代码的存放位置为当前目录。--grpc_python_out=.
:指定生成的gRPC代码的存放位置为当前目录。-I.
:指定搜索.proto文件的路径为当前目录
总结起来,该命令的作用是将当前目录下的helloworld.proto文件生成对应的Python代码,并将生成的代码存放在当前目录中
python使用grpc
syntax = "proto3";// The greeting service definition.
service Greeter {// Sends a greeting 下面就是暴露出来的一些方法rpc SayHello (HelloRequest) returns (HelloReply) {} // 定义返回什么类型就要返回什么类型rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloReply) {}
}// The request message containing the user's name.
message HelloRequest {string name = 1;
}// The response message containing the greetings
message HelloReply {string message = 1;
}
生成出来的文件就直接使用他
go使用grpc
生成指令:
#好像是版本不兼容了 protoc -I . helloworld.proto --go_out=plugins=grpc:.
protoc -I . --go_out=. --go-grpc_out=. ./hello.proto
样例:
syntax = "proto3";
option go_package = ".;proto"; // 这个是必须加的// The greeting service definition.
service Greeter {// Sends a greeting 下面就是暴露出来的一些方法rpc SayHello (HelloRequest) returns (HelloReply) {} // 定义返回什么类型就要返回什么类型
}// The request message containing the user's name.
message HelloRequest {string name = 1;
}// The response message containing the greetings
message HelloReply {string message = 1;
}
https://blog.csdn.net/neve_give_up_dan/article/details/126920398
https://github.com/grpc/grpc-go/issues/3794
https://blog.csdn.net/Mirale/article/details/122736894
UnimplementedXxxServer的作用:https://blog.csdn.net/Canon_in_D_Major/article/details/108135724
https://zhuanlan.zhihu.com/p/660634947
go_package
option go_package = "common/stream/proto/v1";
表示生成的Go代码文件应该位于common/stream/proto/v1
这个包路径下。换句话说,生成的Go代码文件将被放置在common/stream/proto/v1
目录下,并且其package声明会是v1
流模式
grpc的流模式主要有三种:
- 客户端流模式
- 服务端流模式
- 双向流模式
proto文件:stream_proto.proto
syntax = "proto3";option go_package = ".;proto";service Greeter {// 服务端流模式:客户端是流,服务端不是流rpc GetStream(StreamReqData) returns (stream StreamResData); // 服务端流模式rpc PostStream(stream StreamReqData) returns (StreamResData); // 客户端流模式rpc AllStream(stream StreamReqData) returns (stream StreamResData); // 双向流模式
}message StreamReqData{string data = 1;
}message StreamResData{string data = 1;
}
server.go
package mainimport ("GoRpc_quick/stream_grpc_test/proto""fmt""google.golang.org/grpc""net""sync""time"
)const PORT = ":8080"type server struct {proto.UnimplementedGreeterServer
}// 服务端流模式
func (s *server) GetStream(req *proto.StreamReqData, streamServer proto.Greeter_GetStreamServer) error {i := 0for true {streamServer.Send(&proto.StreamResData{Data: fmt.Sprintf("%v\n + %v", time.Now().Unix(), req.Data),})time.Sleep(time.Second)if i++; i > 10 {break}}return nil
}// 客户端流模式
func (s *server) PostStream(streamServer proto.Greeter_PostStreamServer) error {for {recv, err := streamServer.Recv()if err != nil {fmt.Println(err)break}fmt.Println(recv.Data)}return nil
}// 双向流模式
func (s *server) AllStream(streamServer proto.Greeter_AllStreamServer) error {wg := sync.WaitGroup{}wg.Add(2)go func() { // 负责receivedefer wg.Done()for {recv, _ := streamServer.Recv()fmt.Println("收到客户端消息:", recv.Data)}}()go func() {defer wg.Done()for i := 0; i < 10; i++ {streamServer.Send(&proto.StreamResData{Data: "我是服务器"})time.Sleep(time.Second)}}()wg.Wait()return nil
}func main() {listener, err := net.Listen("tcp", PORT)if err != nil {panic(err)}s := grpc.NewServer()proto.RegisterGreeterServer(s, &server{})err = s.Serve(listener)if err != nil {panic("failed to start grpc")}
}
client.go
package mainimport ("GoRpc_quick/stream_grpc_test/proto""context""fmt""google.golang.org/grpc""sync""time"
)func main() {conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure())if err != nil {panic(err)}defer conn.Close()// 服务端流模式(客户端接收流)c := proto.NewGreeterClient(conn)stream, _ := c.GetStream(context.Background(), &proto.StreamReqData{Data: "alice"})for {recv, err := stream.Recv() // 实际上就是socket编程if err != nil {fmt.Println(err.Error())break}fmt.Println(recv)}// 客户端流模式,客户端发送流postStream, err := c.PostStream(context.Background())for i := 0; i < 10; i++ {_ = postStream.Send(&proto.StreamReqData{Data: fmt.Sprintf("客户端流模式 + %d", i)})time.Sleep(time.Second)}// 双向流模式allStream, _ := c.AllStream(context.Background())wg := sync.WaitGroup{}wg.Add(2)go func() { // 负责receivedefer wg.Done()for {recv, _ := allStream.Recv()fmt.Println("收到服务器消息:", recv.Data)}}()go func() {defer wg.Done()for i := 0; i < 10; i++ {allStream.Send(&proto.StreamReqData{Data: "我是客户端"})time.Sleep(time.Second)}}()wg.Wait()}
protobuf知识汇总
基本类型会有对应
一个标量消息字段可以含有一个如下的类型——该表格展示了定义于.proto文件中的类型,以及与之对应的、在自动生成的访问类中定义的类型:
.proto Type | Notes | Python Type | Go Type |
---|---|---|---|
double | float | float64 | |
float | float | float32 | |
int32 | 使用变长编码,对于负值的效率很低,如果你的域有可能有负值,请使用sint64替代 | int | int32 |
uint32 | 使用变长编码 | int | uint32 |
uint64 | 使用变长编码 | int | uint64 |
sint32 | 使用变长编码,这些编码在负值时比int32高效的多 | int | int32 |
sint64 | 使用变长编码,有符号的整型值。编码时比通常的int64高效。 | int | int64 |
fixed32 | 总是4个字节,如果数值总是比总是比228大的话,这个类型会比uint32高效。 | int | uint32 |
fixed64 | 总是8个字节,如果数值总是比总是比256大的话,这个类型会比uint64高效。 | int | uint64 |
sfixed32 | 总是4个字节 | int | int32 |
sfixed64 | 总是8个字节 | int | int64 |
bool | bool | bool | |
string | 一个字符串必须是UTF-8编码或者7-bit ASCII编码的文本。 | str | string |
bytes | 可能包含任意顺序的字节数据。 | str | []byte |
protobuf是会有一个默认值,即使不传,也会有默认值
当一个消息被解析的时候,如果被编码的信息不包含一个特定的singular元素,被解析的对象锁对应的域被设置位一个默认值,对于不同类型指定如下:
- 对于strings,默认是一个空string
- 对于bytes,默认是一个空的bytes
- 对于bools,默认是false
- 对于数值类型,默认是0
- 对于枚举,默认是第一个定义的枚举值,必须为0;
- 对于消息类型(message),域没有被设置,确切的消息是根据语言确定的,详见generated code guide
使用repeated创建数组并使用python传递
client.py
# 方式1
with grpc.insecure_channel("localhost:8080") as channel:stub = helloword_pb2_grpc.GreeterStub(channel)hello_request = helloword_pb2.HelloRequest(id=[1, 2, 3,]) # 先实例化不赋值hello_request.name = "bob"res: helloword_pb2.HelloReply = stub.SayHello(hello_request)print(res.message)# 方式2
with grpc.insecure_channel("localhost:8080") as channel:stub = helloword_pb2_grpc.GreeterStub(channel)hello_request = helloword_pb2.HelloRequest() # 先实例化不赋值hello_request.name = "bob"hello_request.id.extend([1, 2]) # 这个对象已经有默认值创建好了,我们把他当做一个list来进行操作hello_request.id.append(4)res: helloword_pb2.HelloReply = stub.SayHello(hello_request)print(res.message)
嵌套message定义的使用
// The response message containing the greetings
message HelloReply {string message = 1;message Result { // 在里面定义messagestring name = 1;string url = 2;}repeated Result data = 2; // 定义一个message数组
}
py中的嵌套调用:
from grpc_hello.proto.helloword_pb2 import HelloReply # 使用嵌套里面的数据res = HelloReply.Result()
# 从哪嵌套的从哪调用
go中的嵌套调用:
import "GoRpc_quick/grpc_test/proto"proto.Pong{} // 直接调用
map与enum枚举类型等其他类型
// 使用枚举类型,使得类型只能是下面的某一个
enum Gender{MALE = 0;FEMALE = 1;
}// The request message containing the user's name.
message HelloRequest {string name = 1;string url = 2;Gender g = 3;map<string, string> mp = 4;google.protobuf.Timestamp addTime = 5; // 时间戳
}
python grpc配合asynio使用
现在已经有官方的api了https://grpc.github.io/grpc/python/grpc_asyncio.html
这里使用grpclib库来实现:
安装依赖包
pip install grpclib# 生成对应文件
python -m grpc_tools.protoc -I. --python_out=. --grpclib_python_out=. helloworld.proto
go-metadata
go中使用metadata
metadata使得client和server能够为对方提供关于本次调用的一些信息,就像一次http请求的RequestHeader和ResponseHeader一样。http中header的生命周周期是一次http请求,那么metadata的生命周期就是一次RPC调用
1. 新建metadata
MD 类型实际上是map,key是string,value是string类型的slice。
type MD map[string][]string
创建的时候可以像创建普通的map类型一样使用new关键字进行创建:
//第一种方式
md := metadata.New(map[string]string{"key1": "val1", "key2": "val2"})
//第二种方式 key不区分大小写,会被统一转成小写。
md := metadata.Pairs("key1", "val1","key1", "val1-2", // "key1" will have map value []string{"val1", "val1-2"}"key2", "val2", // 使用逗号
)
2. 发送metadata
md := metadata.Pairs("key", "val")// 新建一个有 metadata 的 context
ctx := metadata.NewOutgoingContext(context.Background(), md)// 单向 RPC
response, err := client.SomeRPC(ctx, someRequest)
3. 接收metadata
func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) {md, ok := metadata.FromIncomingContext(ctx)// do something with metadata
}
2. grpc中使用metadata
proro
syntax = "proto3";
option go_package = ".;proto";// The greeting service definition.
service Greeter {// Sends a greeting 下面就是暴露出来的一些方法rpc SayHello (HelloRequest) returns (HelloReply) {} // 定义返回什么类型就要返回什么类型
}// The request message containing the user's name.
message HelloRequest {string name = 1;
}// The response message containing the greetings
message HelloReply {string message = 1;
}
server.go
type Server struct {*proto.UnimplementedGreeterServer
}// 参数设置是硬性规定的
func (s *Server) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) {md, ok := metadata.FromIncomingContext(ctx)if !ok {fmt.Println("get metadata error")}// 遍历一下这个string的mapfor k, v := range md {fmt.Println(k, v)}if nameSlice, ok := md["name"]; ok { // 判断map中是否含有元素fmt.Println(nameSlice)for i, e := range nameSlice {fmt.Println(i, e)}}return &proto.HelloReply{Message: "Go hello " + req.Name,}, nil
}func main() {// 一样的新建serverg := grpc.NewServer()// 注册服务proto.RegisterGreeterServer(g, &Server{})listen, err := net.Listen("tcp", "0.0.0.0:8080")if err != nil {panic("failed to listen: " + err.Error())}err = g.Serve(listen)if err != nil {panic("failed to start grpc: " + err.Error())}
}
client.go
func main() {conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure())if err != nil {panic(err)}defer conn.Close()c := proto_bak.NewGreeterClient(conn)//md := metadata.Pairs("timestamp", time.Now().Format(times))md := metadata.New(map[string]string{"name": "alice","pwd": "12345",})ctx := metadata.NewOutgoingContext(context.Background(), md)r, err := c.SayHello(ctx, &proto_bak.HelloRequest{Name: "bob",})if err != nil {panic(err)}fmt.Println(r.Message)}
python-meta
server.py
import grpcfrom grpc_metadata_test.proto import helloword_pb2, helloword_pb2_grpcif __name__ == '__main__':with grpc.insecure_channel("localhost:50051") as channel:stub = helloword_pb2_grpc.GreeterStub(channel)hello_request = helloword_pb2.HelloRequest() # 先实例化不赋值hello_request.name = "bob"rsp, call = stub.SayHello.with_call(hello_request,metadata=(('name', 'bobby'),('pwd', '123456')))# call 能拿到服务器发回来的东西print(rsp.message)
client.py
import grpc
from concurrent import futures
from grpc_metadata_test.proto import helloword_pb2, helloword_pb2_grpcclass Greeter(helloword_pb2_grpc.GreeterServicer):def SayHello(self, request, context):# 一样的遍历metadata的mapfor k, v in context.invocation_metadata():print(k, v)return helloword_pb2.HelloReply(message=f'Python 你好{request.name}')if __name__ == '__main__':# 1. 实例化serverserver = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # 设置十个线程# 2. 注册逻辑到server中helloword_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)# 3. 启动serverserver.add_insecure_port('[::]:50051')server.start()server.wait_for_termination()
拦截器
go拦截器
server.go
type Server struct {*proto.UnimplementedGreeterServer
}// 参数设置是硬性规定的
func (s *Server) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) {return &proto.HelloReply{Message: "Go hello " + req.Name,}, nil
}func main() {// 定义拦截器的函数interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {fmt.Println("接收到了一个新的清秀")res, err := handler(ctx, req) // handler是原本的调用逻辑fmt.Println("请求已经完成")return res, err}opt := grpc.UnaryInterceptor(interceptor) // 参数是一个函数,所以参数必须一致(go认参数来区分函数// 一样的新建serverg := grpc.NewServer(opt) // 放进去// 注册服务proto.RegisterGreeterServer(g, &Server{})listen, err := net.Listen("tcp", "0.0.0.0:8080")if err != nil {panic("failed to listen: " + err.Error())}err = g.Serve(listen)if err != nil {panic("failed to start grpc: " + err.Error())}
}
client.go
func main() {interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {start := time.Now()err := invoker(ctx, method, req, reply, cc, opts...)fmt.Printf("耗时: %s\n", time.Since(start))return err} // 拦截器逻辑opt := grpc.WithUnaryInterceptor(interceptor) // 需要在dial(拨号)的时候传入参数conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure(), opt)if err != nil {panic(err)}defer conn.Close()c := proto.NewGreeterClient(conn)r, err := c.SayHello(context.Background(), &proto.HelloRequest{Name: "bob",})if err != nil {panic(err)}fmt.Println(r.Message)}
python拦截器
server.py
class Greeter(helloword_pb2_grpc.GreeterServicer):def SayHello(self, request, context):for k, v in context.invocation_metadata():print(k, v)return helloword_pb2.HelloReply(message=f'Python 你好{request.name}')# 调用拦截器逻辑需要继承该抽象类,并且实现这个抽象类的方法
class LogInterceptors(grpc.ServerInterceptor):def intercept_service(self, continuation, handler_call_details):print("请求开始")print(type(handler_call_details))rsp = continuation(handler_call_details) # 相当于handlerprint("请求结束")return rspif __name__ == '__main__':# 实例化一个interinterceptor = LogInterceptors()# 1. 实例化serverserver = grpc.server(futures.ThreadPoolExecutor(max_workers=10), interceptors=(interceptor,)) # 设置十个线程# 并传进去interceptor的tuple或者list, 最后一定要带个,# 2. 注册逻辑到server中helloword_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)# 3. 启动serverserver.add_insecure_port('[::]:50051')server.start()server.wait_for_termination()
client.py
客户端的拦截器
# 第二个参数需要继承重写
class DefaultInterceptor(grpc.UnaryUnaryClientInterceptor):def intercept_unary_unary(self, continuation, client_call_details, request):# 拦截器业务逻辑start = datetime.now()rsp = continuation(client_call_details, request)print((datetime.now() - start).microseconds / 1000, "ms")return rspif __name__ == '__main__':# 实例化对象default_intercept = DefaultInterceptor()with grpc.insecure_channel("localhost:50051") as channel:# 需要对channel进行操作intercept_channel = grpc.intercept_channel(channel, default_intercept)stub = helloword_pb2_grpc.GreeterStub(intercept_channel)hello_request = helloword_pb2.HelloRequest() # 先实例化不赋值hello_request.name = "bob"rsp, call = stub.SayHello.with_call(hello_request,metadata=(('name', 'bobby'),('pwd', '123456')))# call 能拿到服务器发回来的东西print(rsp.message)
验证器
docs:protoc-gen-validate/docs.md at main · bufbuild/protoc-gen-validate · GitHub
每个字段要满足一个验证规则
编译命令:
protoc \-I . \-I path/to/validate/ \--go_out=":../generated" \--validate_out="lang=go:../generated" \example.protoprotoc -I . --go_out=. --go-grpc_out=. --validate_out="lang=go:." ./hello.proto
需要去拷贝一份validate.proto
放到文件中:protoc-gen-validate/validate/validate.proto at main · bufbuild/protoc-gen-validate · GitHub
proto
syntax = "proto3";import "validate.proto";
option go_package=".;proto";service Greeter {rpc SayHello (Person) returns (Person);
}message Person {uint64 id = 1 [(validate.rules).uint64.gt = 999];string email = 2 [(validate.rules).string.email = true];string Mobile = 3 [(validate.rules).string = {pattern: "^(?:(?:\\+|00)86)?1[3-9]\\d{9}$",max_bytes: 256,}];}
server.go
package mainimport ("context""google.golang.org/grpc/codes""google.golang.org/grpc/status""net""google.golang.org/grpc""GoRpc_quick/grpc_validate_test/proto"
)type Server struct {*proto.UnimplementedGreeterServer
}func (s *Server) SayHello(ctx context.Context, request *proto.Person) (*proto.Person,error) {return &proto.Person{Id: 32,}, nil
}type Validator interface {Validate() error
}func main() {//p := new(proto.Person)//p.Id = 1000//err := p.Validate() // 会返回一个error来判断合法性//if err != nil {// panic(err)//}var interceptor grpc.UnaryServerInterceptorinterceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {// 在拦截器中判断完整性,继续处理请求if r, ok := req.(Validator); ok { // 实例化为Validator,用多态的方法调用Validate方法if err := r.Validate(); err != nil {return nil, status.Error(codes.InvalidArgument, err.Error())}}return handler(ctx, req)}var opts []grpc.ServerOptionopts = append(opts, grpc.UnaryInterceptor(interceptor))g := grpc.NewServer(opts...)proto.RegisterGreeterServer(g, &Server{})lis, err := net.Listen("tcp", "0.0.0.0:50051")if err != nil {panic("failed to listen:" + err.Error())}err = g.Serve(lis)if err != nil {panic("failed to start grpc:" + err.Error())}
}
client.go
package mainimport ("GoRpc_quick/grpc_validate_test/proto""context""fmt""google.golang.org/grpc"
)// 本质上封装了metadata的封装
type customCredential struct {
}func main() {var opts []grpc.DialOptionopts = append(opts, grpc.WithInsecure())conn, err := grpc.Dial("127.0.0.1:50051", opts...)if err != nil {panic(err)}defer conn.Close()c := proto.NewGreeterClient(conn)r, err := c.SayHello(context.Background(), &proto.Person{Id: 1000,Email: "1234abc@qq.com",Mobile: "18888888888",})if err != nil {panic(err)}fmt.Println(r.Id)}
错误处理与超时设定
python
客户端处理:
import grpc
from grpc_error_test.proto import helloword_pb2, helloword_pb2_grpcif __name__ == '__main__':with grpc.insecure_channel("localhost:8080") as channel:stub = helloword_pb2_grpc.GreeterStub(channel)hello_request = helloword_pb2.HelloRequest() # 先实例化不赋值hello_request.name = "bob"try:res: helloword_pb2.HelloReply = stub.SayHello(hello_request, timeout=3)except grpc.RpcError as e:d = e.details()print(d)status_code = e.code() # 获取codeprint(status_code.name)print(status_code.value)print(res.message)
服务端处理
import grpc
from concurrent import futures
from grpc_error_test.proto import helloword_pb2, helloword_pb2_grpcclass Greeter(helloword_pb2_grpc.GreeterServicer):def SayHello(self, request, context):context.set_code(grpc.StatusCode.NOT_FOUND)context.set_details("记录不存在")return helloword_pb2.HelloReply(message=f'Python 你好{request.name}')if __name__ == '__main__':# 1. 实例化serverserver = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # 设置十个线程# 2. 注册逻辑到server中helloword_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)# 3. 启动serverserver.add_insecure_port('[::]:8080')server.start()server.wait_for_termination()
Go
客户端处理
func main() {conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure())if err != nil {panic(err)}defer conn.Close()c := proto.NewGreeterClient(conn)// 设置3超时ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)defer cancel()_, err = c.SayHello(ctx, &proto.HelloRequest{Name: "bob",})if err != nil { // 捕捉并解析错误st, ok := status.FromError(err)if !ok {panic("error解析失败")}fmt.Println(st.Message())fmt.Println(st.Code())}//fmt.Println(r.Message)}
服务端处理:
package mainimport ("GoRpc_quick/grpc_error_test/proto""context""google.golang.org/grpc""google.golang.org/grpc/codes""google.golang.org/grpc/status""net"
)type Server struct {*proto.UnimplementedGreeterServer
}// 参数设置是硬性规定的
func (s *Server) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) {return &proto.HelloReply{Message: "Go hello " + req.Name,}, status.Error(codes.InvalidArgument, "未实现") // 返回错误码和描述
}func main() {// 一样的新建serverg := grpc.NewServer()// 注册服务proto.RegisterGreeterServer(g, &Server{})listen, err := net.Listen("tcp", "0.0.0.0:8080")if err != nil {panic("failed to listen: " + err.Error())}err = g.Serve(listen)if err != nil {panic("failed to start grpc: " + err.Error())}
}