RPC的语义是远程过程调用(Remote Procedure Call,RPC)就是将一个服务调用封装在一个本地方法中,让调用者像使用本地方法一样调用服务,对其屏蔽实现细节。
- RPC 会给对应的服务接口名生成一个代理类,即客户端 Stub。使用代理类可以屏蔽掉 RPC 调用的具体底层细节,使得用户无感知的调用远程服务。
- 客户端 Stub 会将当前调用的方法的方法名、参数类型、实参数等根据协议组装成网络传输的消息体,将其序列化成二进制流后,通过 Sockect 发送给 RPC 服务端。
- 服务端收到二进制数据流后,根据约定的协议解析出请求数据,然后反序列化得到参数,通过内部路由找到具体调用的方法,调用该方法拿到执行结果,将其序列化二进制流后,通过 Socket 返回给 RPC 客户端。
- Unary 模式:即请求响应模式
- Client Streaming 模式:Client 发送 多次,Server 回复一次
- Server Streaming 模式:Client 发送一次,Server 发送多次
- 双向 Streaming 模式:Client/Server 都发送多次
● ServerBuilder:这是gRPC暴露给业务层的启动入口,通过设置端口号和对外提供服务的实现类,可以构造一个gRPC Server实例并启动。
● Server:gRPC 服务端中最顶层的服务抽象,有start启动和shutdown关闭两个核心动作。实现类为ServerImpl,实现了服务、方法与方法处理器的绑定、端口监听、不同类型的 Server 实现的调用、Server 生命周期管理等。
● ServerCall:服务调用抽象,在收到Body请求以后真正被触发,发起本地服务调用。
● InternalServer:gRPC真正完成通信动作的内部服务抽象,实现类为NettyServer。
● ServerTransport:InternalServer 内部依赖的通信窗口。
● NettyServerHandler:向Netty注册的处理器,是真正的核心消息接收逻辑的处理者。
● TransportState:通信状态标识,用来标识信道流的处理情况,承担实际的请求接收,解码分发工作。
gRPC 服务端创建采用 Build 模式,对底层服务绑定、NettyServer和gRPC Server 的创建和实例化做了封装和屏蔽,让服务调用者不用关心 RPC 调用细节。
int port = 50052;
Server server = ServerBuilder.forPort(port) // 1.绑定端口.addService(new MyGrpcServiceImpl()) // 2.添加服务实现类.build() // 3.创建NettyServer、Sever实例.start(); // 4.启动NettyServer实例,开始监听
将需要调用的服务端接口实现类注册到内部的注册表中,RPC 调用时可以根据 RPC 请求消息中的服务定义信息(ServerServiceDefinition)查询到服务接口实现类。
定义 PB 文件
//第一行指定了正在使用 proto3 语法:若未指定编译器会使用 proto2。这个指定语法行必须是文件的非空非注释的第一个行。
syntax = "proto3";// 为每个 message 单独生成一个类文件
option java_multiple_files = true;
option java_package = "com.kuaishou.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";//指定 .proto 文件包名,防止不同项目之间的命名冲突。在java中,包名称用作Java包。
package helloworld;// The greeting service definition.
service Greeter {// Sends a greetingrpc SayHello (HelloRequest) returns (HelloReply) {}
}// The request message containing the user's name.
message HelloRequest {string name = 1;
}// The response message containing the greetings
message HelloReply {string message = 1;
在 .pb文件 中定义好 service 后,通过借助 gRPC 的 protocol buffers 插件,生成 gRPC 服务端代码。在 Java 中可使用自动构建工具 maven 的 mvn compile 命令即可生成服务端基类代码以及客户端桩代码。
重点关注生成的类 xxxxGrpc(如:MyGrpcServiceGrpc),是 .proto 文件编译后生成的 service 类。xxxxGrpc 中有一个以 ImplBase 结尾的静态抽象内部类(如:MyGrpcServiceImplBase),服务端需要重写该类中的 rpc 方法,编写自己的业务逻辑。客户端通过 xxxxGrpc 中的静态内部类 xxxxStub 实例(如:MyGrpcServiceBlockingStub)可以调用相应的 rpc 方法。
package com.kuaishou.helloworld;import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;/*** Server that manages startup/shutdown of a {@code Greeter} server.*/
public class MyServer {private static final Logger logger = Logger.getLogger(MyServer.class.getName());private Server server;private void start() throws IOException {/* The port on which the server should run */int port = 50051;server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();logger.info("Server started, listening on " + port);Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {// Use stderr here since the logger may have been reset by its JVM shutdown hook.System.err.println("*** shutting down gRPC server since JVM is shutting down");try {MyServer.this.stop();} catch (InterruptedException e) {e.printStackTrace(System.err);}System.err.println("*** server shut down");}});}private void stop() throws InterruptedException {if (server != null) {server.shutdown().awaitTermination(30, TimeUnit.SECONDS);}}/*** Await termination on the main thread since the grpc library uses daemon threads.*/private void blockUntilShutdown() throws InterruptedException {if (server != null) {server.awaitTermination();}}/*** Main launches the server from the command line.*/public static void main(String[] args) throws IOException, InterruptedException {final MyServer server = new MyServer();server.start();server.blockUntilShutdown();}/*** 该类实现proto文件中对应的service,类中的方法实现service中指定的rpc*/static class GreeterImpl extends GreeterGrpc.GreeterImplBase {@Overridepublic void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();}}}
package com.kuaishou.helloworld;import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;/*** A simple client that requests a greeting from the {@link MyServer}.*/
public class MyClient {private static final Logger logger = Logger.getLogger(MyClient.class.getName());private final GreeterGrpc.GreeterBlockingStub blockingStub;private final GreeterGrpc.GreeterFutureStub futureStub;/** Construct client for accessing HelloWorld server using the existing channel. */public MyClient(Channel channel) {// 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to// shut it down.// Passing Channels to code makes code easier to test and makes it easier to reuse Channels.blockingStub = GreeterGrpc.newBlockingStub(channel);futureStub = GreeterGrpc.newFutureStub(channel);}/** Say hello to server. */public void greet(String name) {logger.info("Will try to greet " + name + " ...");HelloRequest request = HelloRequest.newBuilder().setName(name).build();HelloReply response;ListenableFuture<HelloReply> futureResponse;try {response = blockingStub.sayHello(request);futureResponse = futureStub.sayHello(request);} catch (StatusRuntimeException e) {logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());return;}logger.info("Greeting: " + response.getMessage());try {logger.info("Future Greeting: " + futureResponse.get().getMessage());} catch (Exception e) {e.printStackTrace();}}/*** Greet server. If provided, the first element of {@code args} is the name to use in the* greeting. The second argument is the target server.*/public static void main(String[] args) throws Exception {String user = "Tom";// Access a service running on the local machine on port 50051String target = "localhost:50051";// Create a communication channel to the server, known as a Channel. Channels are thread-safe// and reusable. It is common to create channels at the beginning of your application and reuse// them until the application shuts down.ManagedChannel channel = ManagedChannelBuilder.forTarget(target)// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid// needing certificates..usePlaintext().build();try {MyClient client = new MyClient(channel);client.greet(user);} finally {// ManagedChannels use resources like threads and TCP connections. To prevent leaking these// resources the channel should be shut down when it will no longer be used. If it may be used// again leave it running.channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);}}
- ListenableFuture
- Netty
- Protocol Buffers Document:PB 使用
- Protocol Buffers 编码原理
- grpc-java demo:grpc 代码示例
- RPC框架原理简述:从实现一个简易RPCFramework说起:5 个 Java 类(1 个接口类 + 1 个实现类 + 1 个 RPC 框架类 + 1 个服务注册类 + 1 个服务应用类)讲清楚 RPC 本质
- gRPC 官方文档中文版