文章目录
- 消息格式
- RPC流
- 长度前缀的消息分帧
- 请求消息
- 响应信息
- 三种流
- 一元RPC
- 服务流RPC
- 客户流RPC
- 双工流RPC
- 代码仓库
消息格式
RPC流
服务端实现protocol buffer定义的方法,客户端保留一个存根,提供服务端方法的抽象,客户端只需要调用存根中的方法,就可以远程调用服务端方法。
- 调用存根方法
- 存根创建HTTP POST请求(gRPC中所有请求都是 POST),设置
content-type
为application/grpc
- 到达服务端,会先检查请求头是不是gRPC请求,否则返回415
长度前缀的消息分帧
在写入消息前,先写入长度消息表明每条消息的大小。
每条消息都有额外的4字节来设置大小,也就是说消息的大小不能超过4GB
帧首中还有单字节无符号整数,用来表明数据是否进行了压缩
为1表示使用 message-encoding
中的编码机制进行了压缩
请求消息
客户端发送,包含3个部分:请求头信息、长度前缀的消息、流结束标记
1、对于gRPC 都是POST
2、协议:Http/Https
3、/服务名/方法名
4、目标URI的主机名
5、对不兼容代理的检测,gRPC下这个值必须为 trailers
6、超时时间
7、媒体类型
8、压缩类型
当因为没有要发送的数据而需要关闭请求流时,必须发送一个带标记的空数据帧
响应信息
服务端发送,包含3个部分:响应头信息、长度前缀的信息、trailers
END_STREAM 标记不会随数据帧一起发送,而是作为单独的头信息来发送,名为 trailer
三种流
一元RPC
通信时始终只有一个请求和一个响应
protocol buffer
syntax = "proto3";
package hello;
// 第一个分割参数,输出路径;第二个设置生成类的包路径option go_package = "./proto/hello";// 设置服务名称
service Greeter {// 设置方法rpc SayHello (HelloRequest) returns (HelloReply) {}
}// 请求信息用户名.
message HelloRequest {string name = 1;
}// 响应信息
message HelloReply {string message = 1;
}
服务端
package mainimport ("context""flag""fmt""log""net""google.golang.org/grpc"pb "mygrpc/proto/hello"
)var (port = flag.Int("port", 50051, "The server port")
)type server struct {pb.UnimplementedGreeterServer
}func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {log.Printf("Received: %v", in.GetName())return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))if err != nil {log.Fatalf("failed to listen: %v", err)}// 开启rpcs := grpc.NewServer()// 注册服务pb.RegisterGreeterServer(s, &server{})log.Printf("server listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}
客户端
package mainimport ("context""flag""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入编译生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 调用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}
服务流RPC
通信时可以是一个请求,服务端多次响应,比如查询业务,服务端模糊匹配找到一次就返回客户端一次响应这样的多次响应。
在protobuf中的 service添加以下代码
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
服务端代码
package mainimport ("context""flag""fmt""google.golang.org/grpc""io""log"pb "mygrpc/proto/hello""net"
)var (port = flag.Int("port", 50051, "The service port")
)type server struct {pb.UnimplementedGreeterServer
}func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {log.Printf("Received: %v", in.GetName())return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {log.Printf("Recved %v", req.GetName())// 具体返回多少个response根据业务逻辑调整for i := 0; i < 10; i++ {// 通过 send 方法不断推送数据err := stream.Send(&pb.HelloReply{})if err != nil {log.Fatalf("Send error:%v", err)return err}}return nil
}
func (s *server) UpdateOrders(stream pb.Greeter_UpdateOrdersServer) error {for {log.Println("开始接受客户端的流")// Recv 对客户端发来的请求接收order, err := stream.Recv()if err == io.EOF {// 流结束,关闭并发送响应给客户端return stream.Send(&pb.HelloReply{Message: "接受客户流结束"})}if err != nil {return err}// 更新数据log.Printf("Order ID : %s - %s", order.GetName(), "Updated")}
}
func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))if err != nil {log.Fatalf("failed to listen: %v", err)}// 开启rpcs := grpc.NewServer()// 注册服务pb.RegisterGreeterServer(s, &server{})log.Printf("service listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}
客户端代码
package mainimport ("context""flag""io""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入编译生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer func(conn *grpc.ClientConn) {err := conn.Close()if err != nil {}}(conn)// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 调用指定方法searchStream, _ := c.SearchOrders(ctx, &pb.HelloRequest{Name: "开始服务端rpc流测试"})for {// 客户端 Recv 方法接收服务端发送的流searchOrder, err := searchStream.Recv()if err == io.EOF {log.Print("EOF")break}if err == nil {log.Print("Search Result : ", searchOrder)}}r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}
客户流RPC
客户端多个请求发给服务端,服务端发送一个响应给客户端,比如更新业务,客户端的读个请求发过来,服务端更新完返回一个成功的结果
在protobuf中的 service添加以下代码
rpc updateOrders(stream HelloRequest) returns (stream HelloReply);
服务端代码
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {ordersStr := "Updated Order IDs : "for {// Recv 对客户端发来的请求接收order, err := stream.Recv()if err == io.EOF {// 流结束,关闭并发送响应给客户端return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})}if err != nil {return err}// 更新数据orderMap[order.Id] = *orderlog.Printf("Order ID : %s - %s", order.Id, "Updated")ordersStr += order.Id + ", "}
}
客户端代码
package mainimport ("context""flag""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入编译生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer func(conn *grpc.ClientConn) {err := conn.Close()if err != nil {}}(conn)// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 调用指定方法updateStream, err := c.UpdateOrders(ctx)if err != nil {log.Fatalf("%v.UpdateOrders(_) = _, %v", c, err)}// Updating order 1if err := updateStream.Send(&pb.HelloRequest{Name: "1"}); err != nil {log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "1"}, err)}// Updating order 2if err := updateStream.Send(&pb.HelloRequest{Name: "2"}); err != nil {log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "2"}, err)}// 发送关闭信号并接收服务端响应err = updateStream.CloseSend()if err != nil {log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)}log.Printf("客户端流传输结束")
}
双工流RPC
对应的业务就比如实时的消息流
protobuf
// 设置双工rpcrpc processOrders(stream HelloRequest) returns (stream HelloReply);
服务端
package mainimport ("flag""fmt""google.golang.org/grpc""io""log"pb "mygrpc/proto/hello""net""sync"
)var (port = flag.Int("port", 50051, "The service port")
)type server struct {pb.UnimplementedGreeterServer
}func (s *server) ProcessOrders(stream pb.Greeter_ProcessOrdersServer) error {var (waitGroup sync.WaitGroup // 一组 goroutine 的结束// 设置通道msgCh = make(chan *pb.HelloReply))// 计数器加1waitGroup.Add(1)// 消费队列中的内容go func() {// 计数器减一defer waitGroup.Done()for {v := <-msgChfmt.Println(v)err := stream.Send(v)if err != nil {fmt.Println("Send error:", err)break}}}()waitGroup.Add(1)// 向队列中添加内容go func() {defer waitGroup.Done()for {req, err := stream.Recv()if err == io.EOF {break}if err != nil {log.Fatalf("recv error:%v", err)}fmt.Printf("Recved :%v \n", req.GetName())msgCh <- &pb.HelloReply{Message: "服务端传输数据"}}close(msgCh)}()// 等待 计数器问0 推出waitGroup.Wait()// 返回nil表示已经完成响应return nil
}
func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))if err != nil {log.Fatalf("failed to listen: %v", err)}// 开启rpcs := grpc.NewServer()// 注册服务pb.RegisterGreeterServer(s, &server{})log.Printf("service listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}
客户端
package mainimport ("context""flag""fmt""io""log""sync""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入编译生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer func(conn *grpc.ClientConn) {err := conn.Close()if err != nil {}}(conn)// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 设置var wg sync.WaitGroup// 调用指定方法stream, _ := c.ProcessOrders(ctx)if err != nil {panic(err)}// 3.开两个goroutine 分别用于Recv()和Send()wg.Add(1)go func() {defer wg.Done()for {req, err := stream.Recv()if err == io.EOF {fmt.Println("Server Closed")break}if err != nil {continue}fmt.Printf("Recv Data:%v \n", req.GetMessage())}}()wg.Add(1)go func() {defer wg.Done()for i := 0; i < 2; i++ {err := stream.Send(&pb.HelloRequest{Name: "hello world"})if err != nil {log.Printf("send error:%v\n", err)}}// 4. 发送完毕关闭streamerr := stream.CloseSend()if err != nil {log.Printf("Send error:%v\n", err)return}}()wg.Wait()log.Printf("服务端流传输结束")
}
代码仓库
https://github.com/onenewcode/mygrpc.git
也可以直接下载绑定的资源。