grpc简单使用 java_gRPC学习记录(四)-官方Demo - Java 技术驿站-Java 技术驿站

了解proto3后,接下来看官方Demo作为训练,这里建议看一遍之后自己动手搭建出来,一方面巩固之前的知识,一方面是对整个流程更加熟悉.

官方Demo地址: https://github.com/grpc/grpc-java

例子是一个简单的路由映射的应用,它允许客户端获取路由特性的信息,生成路由的总结,以及交互路由信息,如服务器和其他客户端的流量更新.

1.1定义服务

也就是写proto文件

//指定proto3格式

syntax = "proto3";

//一些生成代码的设置

option java_multiple_files = true;//以外部类模式生成

option java_package = "cn.mrdear.route";//所在包名

option java_outer_classname = "RouteProto";//最外层类名称

//定义服务

service RouteGuide{

//得到指定点的feature

//一个 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。

rpc GetFeature(Point) returns (Feature) {}

//获取一个矩形内的点

//一个 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,

//直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。

rpc ListFeatures(Rectangle) returns (stream Feature){}

//记录该点

//一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,

//它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。

rpc RecordRoute(stream Point) returns (RouteSummary){}

//路由交流

//一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器

//可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,

//或者其他读写的组合。每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。

rpc RouteChat(stream RouteNote) returns (stream RouteNote){}

}

//代表经纬度

message Point {

int32 latitude = 1;

int32 longitude = 2;

}

//由两个点确定的一个方块

message Rectangle{

Point lo = 1;

Point hi = 2;

}

//某一位置的名称

message Feature {

string name = 1;

Point location = 2;

}

// Not used in the RPC. Instead, this is here for the form serialized to disk.

message FeatureDatabase {

repeated Feature feature = 1;

}

//给某一点发送消息

message RouteNote{

Point location = 1;

string message = 2;

}

//记录收到的信息

message RouteSummary{

int32 point_count = 1;

int32 feture_count = 2;

int32 distance = 3;

int32 elapsed_time = 4;

}

执行mvn compile生成如下代码:

27e9b1205d5318a236cef4e657ed9f9c.png

1.2编写RouteGuideService

该类就是这个项目所提供给外部的功能.该类需要继承RouteGuideGrpc.RouteGuideImplBase,这个类提供了我们所定义分服务接口,继承后覆盖需要实现的自定义方法.

简单 RPC

简单RPC和普通方法调用形式差不多,客户端传来一个实体,服务端返回一个实体.

@Override

public void getFeature(Point request, StreamObserver responseObserver) {

System.out.println("getFeature得到的请求参数: " + request.toString());

// responseObserver.onError(); 代表请求出错

responseObserver.onNext(checkFeature(request));//包装返回信息

responseObserver.onCompleted();//结束一次请求

}

//找到复核的feature

private Feature checkFeature(Point location) {

for (Feature feature : features) {

if (feature.getLocation().getLatitude() == location.getLatitude()

&& feature.getLocation().getLongitude() == location.getLongitude()) {

return feature;

}

}

// No feature was found, return an unnamed feature.

return Feature.newBuilder().setName("").setLocation(location).build();

}

其中StreamObserver是一个应答观察者,用于封装返回的信息,服务器把该信息传给客户端.请求结束要调用onCompleted()方法.

服务器端流式 RPC

在proto文件中声明了stream,但是从接口上看不出来和简单RPC的区别,代码中最主要的区别是多次调用responseObserver.onNext()的方法,最后完成时写回数据.

@Override

public void listFeatures(Rectangle request, StreamObserver responseObserver) {

int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());

int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());

int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());

int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

for (Feature feature : features) {

//如果不存在则继续

if (!RouteGuideUtil.exists(feature)) {

continue;

}

int lat = feature.getLocation().getLatitude();

int lon = feature.getLocation().getLongitude();

if (lon >= left && lon <= right && lat >= bottom && lat <= top) {

//找到符合的就写入

responseObserver.onNext(feature);

}

}

//最后标识完成

responseObserver.onCompleted();

}

客户端流式 RPC

服务端就需要一直监控客户端写入情况,因此需要一个StreamObserver接口,其中onNext方法会在客户端每次写入时调用,当写入完毕时调用onCompleted()方法.具体还要到后面客户端调用分析.

@Override

public StreamObserver recordRoute(StreamObserver responseObserver) {

return new StreamObserver() {

int pointCount;

int featureCount;

int distance;

Point previous;

long startTime = System.nanoTime();

//客户端每写入一个Point,服务端就会调用该方法

@Override

public void onNext(Point point) {

System.out.println("recordRoute得到的请求参数: " + point.toString());

pointCount++;

if (RouteGuideUtil.exists(checkFeature(point))) {

featureCount++;

}

if (previous != null) {

distance += calcDistance(previous, point);

}

previous = point;

}

@Override

public void onError(Throwable throwable) {

throwable.printStackTrace();

System.err.println("Encountered error in recordRoute");

}

//客户端写入结束时调用

@Override

public void onCompleted() {

long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);

responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)

.setFetureCount(featureCount)

.setDistance(distance)

.setElapsedTime((int) seconds).build());

responseObserver.onCompleted();

}

};

}

双向流式 RPC

和客户端流式RPC差不多.

@Override

public StreamObserver routeChat(StreamObserver responseObserver) {

return new StreamObserver() {

@Override

public void onNext(RouteNote note) {

List notes = getOrCreateNotes(note.getLocation());

for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {

responseObserver.onNext(prevNote);

}

notes.add(note);

}

@Override

public void onError(Throwable t) {

t.printStackTrace();

System.err.println("Encountered error in routeChat");

}

@Override

public void onCompleted() {

responseObserver.onCompleted();

}

};

}

1.3创建服务端

和Helloworld一样的形式,最主要的是addService(new RouteGuideService(features)),这里把需要注册的服务给注册上.

public class RouteGuideServer {

private final int port;//服务端端口

private final Server server;//服务器

public RouteGuideServer(int port) throws IOException {

this.port = port;

//获取初始化数据

List features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());

//初始化Server参数

server = ServerBuilder.forPort(port)

//添加指定服务

.addService(new RouteGuideService(features))

.build();

}

/**

* 启动服务

*/

public void start() throws IOException {

server.start();

System.out.println("Server started, listening on " + port);

//程序退出时关闭资源

Runtime.getRuntime().addShutdownHook(new Thread(() -> {

System.err.println("*** shutting down gRPC server since JVM is shutting down");

RouteGuideServer.this.stop();

System.err.println("*** server shut down");

}));

}

/**

* 关闭服务

*/

public void stop() {

if (server != null) {

server.shutdown();

}

}

/**

* 使得server一直处于运行状态

*/

private void blockUntilShutdown() throws InterruptedException {

if (server != null) {

server.awaitTermination();

}

}

public static void main(String[] args) throws IOException, InterruptedException {

RouteGuideServer server = new RouteGuideServer(50051);

server.start();

server.blockUntilShutdown();

}

}

1.4编写客户端

客户端需要一个channel和一个存根blockingStub或者asyncStub根据业务需要选择同步或者异步.

private final ManagedChannel channel;//grpc信道,需要指定端口和地址

private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;//阻塞/同步存根

private final RouteGuideGrpc.RouteGuideStub asyncStub;//非阻塞,异步存根

public RouteGuideClient(String host,int port) {

//创建信道

channel = ManagedChannelBuilder.forAddress(host, port)

.usePlaintext(true)

.build();

//创建存根

blockingStub = RouteGuideGrpc.newBlockingStub(channel);

asyncStub = RouteGuideGrpc.newStub(channel);

}

/**

* 关闭方法

*/

public void shutdown() throws InterruptedException {

channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);

}

简单grpc

和调用普通方法形式差不多.

public void getFeature(int lat,int lon){

System.out.println("start getFeature");

Point request = Point.newBuilder()

.setLatitude(lat)

.setLongitude(lon)

.build();

Feature feature;

try {

//同步阻塞调用

feature = blockingStub.getFeature(request);

System.out.println("getFeature服务端返回 :" + feature);

} catch (StatusRuntimeException e) {

System.out.println("RPC failed " +e.getStatus());

}

}

调用代码:

public static void main(String[] args) throws InterruptedException {

RouteGuideClient client = new RouteGuideClient("localhost", 50051);

try {

client.getFeature(409146138, -746188906);//成功案例

client.getFeature(0, 0);//失败案例

} finally {

client.shutdown();

}

}

客户端日志

webp

服务端日志(参数都为0的时候,这边并没拿到参数)

034ea790b505c8bfe4ec71c0db6d7304.png

服务器端流式 RPC

和简单RPC差不多,只不过返回的是一个集合类.

//2.服务端流式RPC

public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon){

System.out.println("start listFeatures");

Rectangle request =

Rectangle.newBuilder()

.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())

.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator features;

try {

features = blockingStub.listFeatures(request);

for (int i = 1; features.hasNext(); i++) {

Feature feature = features.next();

System.out.println("getFeature服务端返回 :" + feature);

}

} catch (Exception e) {

System.out.println("RPC failed " +e.getMessage());

}

}

客户端日志:

1a0f0077826d636566b1f7ad55cb5326.png

服务端日志:

43ee552bd1cd90e291b2a7666a5952dd.png

客户端流式 RPC

该种方式两遍都是异步操作,所以需要互相监听,也因此需要使用阻塞存根.服务端监听Point的写入,客户端监听RouteSummary的写回.

public void recordRoute(List features, int numPoints) throws InterruptedException {

System.out.println("start recordRoute");

final CountDownLatch finishLatch = new CountDownLatch(1);

//建一个应答者接受返回数据

StreamObserver responseObserver = new StreamObserver() {

@Override

public void onNext(RouteSummary summary) {

System.out.println("recordRoute服务端返回 :" + summary);

}

@Override

public void onError(Throwable t) {

System.out.println("RecordRoute Failed");

finishLatch.countDown();

}

@Override

public void onCompleted() {

System.out.println("RecordRoute finish");

finishLatch.countDown();

}

};

//客户端写入操作

StreamObserver requestObserver = asyncStub.recordRoute(responseObserver);

Random random = new Random();

try {

for (int i = 0; i < numPoints; ++i) {

int index = random.nextInt(features.size());

Point point = features.get(index).getLocation();

System.out.println("客户端写入point:" + point);

requestObserver.onNext(point);

Thread.sleep(random.nextInt(1000) + 500);

if (finishLatch.getCount() == 0) {

return;

}

}

} catch (RuntimeException e) {

requestObserver.onError(e);

throw e;

}

//标识已经写完

requestObserver.onCompleted();

// Receiving happens asynchronously

if (!finishLatch.await(1, TimeUnit.MINUTES)) {

System.out.println("recordRoute can not finish within 1 minutes");

}

}

客户端日志:

86db6d9171c2fe0e13ae405c7a8b5464.png

服务端日志:

fa096bd9fd0f09d821052191cdbb09e7.png

双向流式 RPC

和客户端流式RPC比较接近,同样都需要双方监控.

public CountDownLatch routeChat() {

System.out.println("start routeChat");

final CountDownLatch finishLatch = new CountDownLatch(1);

//写入监听

StreamObserver requestObserver =

//写回监听

asyncStub.routeChat(new StreamObserver() {

//服务端每写回一个操作就调用

@Override

public void onNext(RouteNote note) {

System.out.println("服务端写回: " + note);

}

@Override

public void onError(Throwable t) {

t.printStackTrace();

System.out.println("RouteChat Failed:");

finishLatch.countDown();

}

@Override

public void onCompleted() {

System.out.println("Finished RouteChat");

finishLatch.countDown();

}

});

try {

RouteNote[] requests =

{newNote("First message", 0, 0), newNote("Second message", 0, 1),

newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

for (RouteNote request : requests) {

System.out.println("客户端写入:" + request);

requestObserver.onNext(request);

}

} catch (RuntimeException e) {

requestObserver.onError(e);

throw e;

}

//标识写完

requestObserver.onCompleted();

return finishLatch;

}

这里调用需要特殊处理下;

CountDownLatch finishLatch = client.routeChat();

if (!finishLatch.await(1, TimeUnit.MINUTES)) {

System.out.println("routeChat can not finish within 1 minutes");

}

客户端日志:

a058cea870f54ce53da7f6e73ceebd64.png

服务端日志:

f7c18636ee8ac149cbc4bdaad8f3216d.png

官方Demo之后,入门算结束,接下来就要看详细的官方文档,然后在项目中使用,这个过程会遇到不少问题,解决这些问题就是对这个技术的熟练.

附录:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/485769.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

冯端:漫谈物理学的过去、现在与未来

来源&#xff1a; 算法与数学之美撰文&#xff1a; 冯端 (南京大学物理系)“物理学的过去、现在和未来”是一个非常大而且重要的题目&#xff0c;也是一个非常难讲的题目&#xff0c;特别是涉及物理学的未来&#xff0c;结果往往是贻笑大方。这里以历史的透视为主线&#xff0c…

第四次博客

第四次博客 一、测试与正确性论证的比较 测试具有针对性&#xff0c;能从一些方面完美的展现出代码的正确性&#xff0c;但是它的验证度取决于样例的质量。 优点是方便&#xff0c;快捷&#xff0c;结果明显&#xff1b;缺点是可能不会覆盖到方方面面。 正确性论证能从所有的方…

院士论坛|李德仁:测绘遥感能为智能驾驶做什么? ——论测绘遥感与智能驾驶

来源&#xff1a; 中国测绘学会未来智能实验室的主要工作包括&#xff1a;建立AI智能系统智商评测体系&#xff0c;开展世界人工智能智商评测&#xff1b;开展互联网&#xff08;城市&#xff09;云脑研究计划&#xff0c;构建互联网&#xff08;城市&#xff09;云脑技术和企业…

Spring Boot-springbootHelloword(一)

什么是springboot sprng家族一个全新的框架 简化我们应用程序的创建和开发的过程&#xff0c;使用默认配置简化了我们以前传统的配置 springboot的特性 能够快速创建spring程序能够使用java main方法启动内嵌的 tomcat 或者jetty服务器运行spring boot程序提供约定的starter p…

原创工作发表难之叶公好龙

来源&#xff1a;张志东科学网博客链接地址&#xff1a;http://blog.sciencenet.cn/blog-2344-1265601.html 最近&#xff0c;科学网上一个热点话题是原创工作发表难不难&#xff1f;以真傻为代表的认为原创工作发表难&#xff0c;以王立新为代表的认为原创工作发表不难。那么&…

java字符的输入流_Java:字节流和字符流(输入流和输出流)

InputStream是抽象基类&#xff0c;所以它不可以创建对象&#xff0c;但它可以用来“接口化编程”&#xff0c;因为大部分子类的函数基类都有定义&#xff0c;所以利用基类来调用函数。FileInputStream是用来读文件数据的流&#xff0c;所以它需要一个文件对象用来实例化&#…

任正非内部重磅发言:华为不可能简单学阿里、亚马逊

来源&#xff1a;券商中国辞旧迎新之际&#xff0c;华为创始人任正非此前在华为企业业务及云业务汇报会上的发言于华为心声社区曝光。任正非指出&#xff0c;华为企业业务要聚焦战略重点&#xff0c;继续做减法&#xff0c;收缩企业业务做战线&#xff0c;认真弄清楚做作战模型…

java8 迭代set集合_JavaSE(八)集合之Set

2.2、HashSet特点不能保证元素的排列顺序&#xff0c;顺序可能和添加的顺序不同&#xff0c;顺序也有可能发生变化。HashSetf不是同步的&#xff0c;如果多个线程同时来访问一个 HashSet&#xff0c;假设有两个或者两个以上线程同时修改了HashSet 集合时&#xff0c;则必须通过…

AI 发展方向大争论:混合AI ?强化学习 ?将实际知识和常识整合到AI中 ?

一个仿人机器人的延伸手。机器人常常使用强化学习来加以训练来源&#xff1a;云头条作者&#xff1a;Ben Dickson是一名软件工程师&#xff0c;还是探讨技术在如何解决和带来问题的TechTalks博客的创始人。2010年代对于AI界来说意义重大&#xff0c;这归功于深度学习领域取得了…

吴恩达:2020 年,这些 AI 大事件让我无法忘怀...

作者&#xff1a;吴恩达编译&#xff1a;陈大鑫、贝爽编辑&#xff1a;青暮转自&#xff1a;AI科技评论日前&#xff0c;吴恩达在圣诞节之际回顾了2020年AI的一些重大事件&#xff0c;包括AI应对新冠疫情、数据集存在种族偏见、对抗虚假信息算法、AlphaFold预测蛋白质三维结构、…

java工程师的一生_百看不厌之一张图诠释程序员的一生

原标题&#xff1a;百看不厌之一张图诠释程序员的一生01百看不厌系列之一张图诠释程序员的一生。02变得越来越强了...03不知道是干什么的&#xff0c;但又不敢删。。。04当前端的人抱怨Java 时&#xff0c;C程序员05给产品经理设计了一款趁手的宝贝06离职程序员交接项目07这位压…

好久没玩laravel了,5.6玩下(三)

好久没玩laravel了&#xff0c;5.6玩下&#xff08;三&#xff09; 好了&#xff0c;基础的测试通了&#xff0c;咱们开始增删改了 思路整理 先创建项目功能控制器 然后设置路由访问规则 然后开发项目的增删改功能 1 先创建项目的控制器 php artisan make:controller ProjectsC…

OpenAI祭出120亿参数魔法模型!从文本合成图像栩栩如生,仿佛拥有人类的语言想象力...

来源&#xff1a;AI科技评论作者&#xff1a;OpenAI编译&#xff1a;贝爽、陈大鑫前几个月GPT-3刚刚问世的时候&#xff0c;能够根据一段话就写出一个小说、一段哲学语录&#xff0c;就足以令AI圈为之感到兴奋。然而2020年刚刚开始没多久&#xff0c;OpenAI又实现一重大突破&am…

MySQL快速生成连续整数

很多时候需要用到连续的id进行数据对比&#xff0c;如判断是否连续等问题。那么&#xff0c;生成连续整数的方式有多种&#xff0c;首先容易想到的是逐步循环&#xff0c;如果想生成1kw条记录&#xff0c;则需要循环1kw次进行插入&#xff0c;那么有没有其他方式呢&#xff0c;…

MySQL小问题:The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents...

这是因为时区设置不对 问题背景&#xff1a; 在运行storm项目&#xff0c;进行页面显示的时候&#xff0c;报错&#xff1a; java.sql.SQLException: The server time zone value is unrecognized or represents more than one time zone. 这是时区的问题。 我采用的第一个…

中国电子信息工程科技发展十四大趋势(2021)

来源&#xff1a;科技日报、中国电子报&#xff08;转载请注明来源&#xff09;编辑&#xff1a;蒲蒲1月5日&#xff0c;中国工程院信息与电子工程学部、中国信息与电子工程科技发展战略研究中心发布“中国电子信息工程科技发展十四大趋势&#xff08;2021&#xff09;”&#…

rosserial_java_[学习笔记]Rosserial实现Windows-ROS交互操作(1)

安装sudo apt-get install ros-indigo-rosserial-windowssudo apt-get install ros-indigo-rosserial-server编译rosrun rosserial_windows make_libraries.py my_library运行后会产生一个my_library文件夹&#xff0c;在my_library这个文件夹下会生成ros_lib文件夹Windows下创…

Leetcode--122. 买卖股票的最佳时机Ⅱ

给定一个数组&#xff0c;它的第 i 个元素是一支给定股票第 i 天的价格。 设计一个算法来计算你所能获取的最大利润。你可以尽可能地完成更多的交易&#xff08;多次买卖一支股票&#xff09;。 注意&#xff1a;你不能同时参与多笔交易&#xff08;你必须在再次购买前出售掉…

语法树的画法(根据文法求字符串)

目录 1.语法树的画法 2.语法树的短语 3.直接短语&#xff08;直接到根部&#xff09; 4.素短语 5.句柄 6.算符优先分析句型 1.语法树的画法 文法G[E]:E->EE | E*E | (E) | i ,字符串 ii*i 推导方式有两种最左推导和最右推导&#xff08;推导的技巧就是逐步靠近字符串…

AI专家喋喋不休展开争论 为什么说预测是智能的本质

来源&#xff1a; 网易智能编译&#xff1a;网易智能 选自&#xff1a;medium参与:Rosie【网易智能讯 6月22日消息】机器学习和智能都植根于预测&#xff0c;这是巧合吗&#xff1f;当我们的技术体现了智能的本质时&#xff0c;我们正在接近一个紧要关头吗&#xff1f;或者说我…