基于消息中间件RabbitMQ实现简单的RPC服务

转载自  基于消息中间件RabbitMQ实现简单的RPC服务

RPC(Remote Procedure Call,远程过程调用),是一种计算机通信协议。对于两台机器而言,就是A服务器上的应用程序调用B服务器上的函数或者方法,由于不在同一个内存空间或机器上运行,因此需要借助于网络通信。

1. RPC框架

我们首先通过一张图理解RPC的工作流程:

因此,实现一个最简单的RPC服务,只需要Client、Server和Network,本文就是利用消息中间件RabbitMQ作为Network载体传输信息,实现简单的RPC服务。简单原理可如下图所示:

即:当Client发送RPC请求时,Client端是消息生产者,Server端是消息消费者;当Server返回结果时,Server端是消息生产者,Client是消息消费者;发送和返回使用不同的队列。

接下来我们通过代码,详细展示一个计算斐波那契数列的RPC服务。

2. RPCServer实现

2.1 Server初始化

/*** 队列名、交换机名、路由键*/
private static final String EXCHANGE_NAME = "rpc_exchange";
private static final String QUEUE_NAME = "request_rpc_queue";
private static final String ROUTING_KEY = "rpc_routing_key";private Connection connection = null;
private Channel channel = null;
private QueueingConsumer consumer = null;/*** Server的构造函数*/
private RPCServer() {try {//创建链接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Config.HOST);factory.setPort(Config.PORT);factory.setUsername(Config.USER);factory.setPassword(Config.PASSWORD);connection = factory.newConnection();//创建信道channel = connection.createChannel();//设置AMQP的通信结构channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);//设置消费者consumer = new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, false, QUEUE_NAME, consumer);} catch (Exception e) {LOG.error("build connection failed!", e);}
}

初始化就是声明RabbitMQ的链接工厂、链接、信道、队列、交换机等等,并做了绑定,由此构成了AMQP的通信结构。

2.2 监听队列并反馈

/*** 开启server*/
private void startServer() {try {LOG.info("Waiting for RPC calls.....");while (true) {//获得文本消息QueueingConsumer.Delivery delivery = consumer.nextDelivery();BasicProperties props = delivery.getProperties();//返回消息的属性BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();long receiveTime = System.currentTimeMillis();JSONObject json = new JSONObject();try {String message = new String(delivery.getBody(), "UTF-8");int n = Integer.parseInt(message);LOG.info("Got a request: fib(" + message + ")");json.put("status", "success");json.put("result", fib(n));} catch (Exception e) {json.put("status", "fail");json.put("reason", "Not a Number!");LOG.error("receive message failed!", e);} finally {long responseTime = System.currentTimeMillis();json.put("calculateTime", (responseTime - receiveTime));channel.basicPublish("", props.getReplyTo(), replyProps, json.toString().getBytes("UTF-8"));channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}} catch  (Exception e) {LOG.error("server failed!", e);} finally {if (connection != null) {try {connection.close();} catch (Exception e) {LOG.error("close failed!", e);}}}
}

在该方法中使用了一个无限循环,每次处理一条消息。通过调用消费者对象的nextDelivery方法来获得RabbitMQ队列的最新一条消息。同时通过getProperties获取到消息中的反馈信息属性,用于标记客户端Client的属性。然后计算斐波那契数列的结果。最后通过basicAck使用消息信封向RabbitMQ确认了该消息。

到这里就实现了计算斐波那契数列RPC服务的Server端。

3. RPCClient实现

3.1 初始化CLient

/*** 消息请求的队列名、交换机名、路由键*/
private static final String EXCHANGE_NAME = "rpc_exchange";
private static final String QUEUE_NAME = "request_rpc_queue";
private static final String ROUTING_KEY = "rpc_routing_key";/*** 消息返回的队列名、交换机名、路由键*/
private static final String RESPONSE_QUEUE = "response_rpc_queue";
private static final String RESPONSE_ROUTING_KEY = "response_rpc_routing_key";/*** RabbitMQ的实体*/
private Connection connection = null;
private Channel channel = null;
private QueueingConsumer consumer = null;/*** 构造客户端* @throws Exception*/
private RPCClient() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Config.HOST);factory.setPort(Config.PORT);factory.setUsername(Config.USER);factory.setPassword(Config.PASSWORD);connection = factory.newConnection();channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);channel.queueBind(RESPONSE_QUEUE, EXCHANGE_NAME, RESPONSE_ROUTING_KEY);consumer = new QueueingConsumer(channel);channel.basicConsume(RESPONSE_QUEUE, true, consumer);
}

这里声明AMQP结构体的方式和Server端类似,只不过Client端需要多声明一个队列,用于RPC的response。

3.2 发送/接收消息

/*** 请求server* @param message* @return* @throws Exception*/
private String requestMessage(String message) throws Exception {String response = null;String corrId = UUID.randomUUID().toString();BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(RESPONSE_QUEUE).build();channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();if (delivery.getProperties().getCorrelationId().equals(corrId)) {response = new String(delivery.getBody(),"UTF-8");break;}}return response;
}

BasicProperties用于存储你请求消息的属性,这里我设置了correlationId和replyTo属性,用于Server端的返回识别。

4. 运行测试

Client端发送:

Server端接收并处理:

Client收到计算结果:

由于我运行RabbitMQ的服务器是租用的阿里云的,差不多传输时延在60ms左右,如果把RPC服务和消息中间件同机房部署的话延时基本上就在ms级别。

5. FAQ

5.1 说明

需要体验完整的过程,你需要如下环境:

JDK1.6以上 + Maven + RabbitMQ

5.2 源代码

完整代码代码请戳:github

其中Server的代码在:

rpc.RPCServer

Client端的代码位置:

rpc.RPCClient

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322609.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

laravel关闭crsf

在中间件VerifyCsrfToken.php 加入 自己想要关闭的crsf protected $except [user/*,article/*,article,api/*,];

开源纯C#工控网关+组态软件(七)数据采集与归档

一、 引子在当前自动化、信息化、智能化的时代背景下,数据的作用日渐凸显。而工业发展到如今,科技含量和自动化水平均显著提高,但对数据的采集、利用才开始起步。对工业企业而言,数据采集日益受到重视,主要应用场景包…

nssl1167-桐人的约会【最短路】

正题 题目大意 去掉一条边使得最短路最长。 解题思路 这条边一定在最短路上而最短路最多只有n−1n-1n−1条边&#xff0c;所以直接枚举最短路上的边。复杂度O(nmK)O(nmK)O(nmK) codecodecode #include<cstdio> #include<algorithm> #include<queue> #incl…

实践出真知之Spring Cloud之基于Eureka、Ribbon、Feign的真实案例

转载自 实践出真知之Spring Cloud之基于Eureka、Ribbon、Feign的真实案例 Eureka是Spring Cloud Eureka的简称&#xff0c;是Netflix提供的组件之一。通过Eureka可以提供服务注册、发现、负载均衡、降级、熔断等功能。本篇主要介绍Eureka作为服务注册中心&#xff0c;以及实现…

从零开发一个laravel项目的增删改查、详情

环境要求&#xff1a; wampcomposer 创建laravel项目&#xff1a; composer create-project --prefer-dist laravel/laravel person快速完成person注册登录开发 1、migration php artisan make:migration create_people_table$table->increments(id);$table->string…

使用Api分析器与Windows兼容包来编写智能的跨平台.NET Core应用

本文翻译自Scott Hanselman博客&#xff1a;https://www.hanselman.com/blog/WritingSmarterCrossplatformNETCoreAppsWithTheAPIAnalyzerAndWindowsCompatibilityPack.aspx正文&#xff1a;这是最近这几周你应该知道的一对.Net Core界的优秀工具。我们在编写或者移植跨平台代码…

P4562-[JXOI2018]游戏【数论,组合数学】

正题 题目链接:https://www.luogu.org/problemnew/show/P4562 题目大意 l∼rl\sim rl∼r的变化&#xff0c;每次访问第iii个那么iii的倍数就不用访问了。对于一个顺序sss&#xff0c;定义t(s)t(s)t(s)表示按这个顺序访问玩前t(s)t(s)t(s)个就都不用访问了。求所有顺序的t(s)t(…

Redis RDB文件格式全解析

转载自 Redis RDB文件格式全解析 点评 这篇文章作为对RDB理解的教程文章&#xff0c;对RDB文件的原理理解有助于进行Redis高阶应用的设计与开发。 文章转自&#xff1a;http://blog.nosqlfan.com/html/3734.html 作者&#xff1a;nosqlfan RDB文件是Redis持久化的一种方式…

实验进行中:.NET WebAssembly支持

目前四大主流浏览器都默认支持WebAssembly&#xff0c;而.NET社区也在继续推动为.NET开发者提供相关能力&#xff0c;来将他们的代码编译成WebAssembly&#xff0c;然后在浏览器上运行。WebAssembly是一种二进制web格式&#xff0c;旨在以接近原生的性能运行不是用JavaScript语…

Js对象如何添加方法、查看Api

js万物皆对象&#xff0c;要带着观察对象的眼观去看待每一个函数、变量。 为什么要用到原型&#xff1f; Es6以前&#xff0c;js中没有如ooa编程当中的class&#xff0c;但是要用到类&#xff0c;怎么办呢&#xff0c;构造函数就应运而生&#xff0c;但是构造函数里面添加方法…

Java web文件下载断点续传

一、下载文件请求 RequestMapping(value "/file/download")ResponseBodypublic Res download(HttpServletRequest request, HttpServletResponse response) {File file new File(request.getParameter("fileName"));if (file.exists()) {String range …

ajax面试技术回答模板

ajax是什么&#xff1f; 缩写、核心 1.ajax就是异步的 JS 和 XML 的缩写&#xff0c;目前我们一般用 JSON 代替 XML。 2.该技术最核心概念是 XMLHttpRequest 对象&#xff0c;该对象可发起 HTTP 请求&#xff0c;我们可以监听其 readystate 的变化获得响应。 怎么用&#xff…

微软人工智能和对话平台--知识商城体验

前言微软最新发布 知识商城了&#xff01;这是一个人工智能和对话平台应用的场景。他可以让开发者带着想法 出做天马行空的创造性工作&#xff01;你只需要稍微动动手&#xff0c;如&#xff1a;拖拽板块&#xff0c;就可以做到极致对答、代码自动生成&#xff01;想象一下&…

P1375-小猫【卡特兰数】

正题 题目链接:https://www.luogu.org/problemnew/show/P1375 题目大意 东西两两绑在一起&#xff0c;要求绳子不能交叉&#xff0c;求方案数。 解题思路 0表示压入第i只猫&#xff0c;1表示弹出栈顶的猫并且和第i只猫绑在一起&#xff0c;这样就能保证不会交叉。 也就是卡特…

Spring @Import注解配置类方法内部调用没有注入属性值的坑

一、场景复现 application.yaml spring:application:name: config-testprofiles:active: devconfig:config-01:name: zhansancode: 001config-02:name: lisicode: 002导入配置类 Configuration Import(ImportConfig.class) public class Config {BeanConfigurationPropertie…

使用Xamarin开发手机聊天程序 -- 基础篇(大量图文讲解 step by step,附源码下载)

如果是.NET开发人员&#xff0c;想学习手机应用开发&#xff08;Android和iOS&#xff09;&#xff0c;Xamarin 无疑是最好的选择&#xff0c;编写一次&#xff0c;即可发布到Android和iOS平台&#xff0c;真是利器中的利器啊&#xff01;而且&#xff0c;Xamarin已经被微软收购…

P3441-[POI2006]MET-Subway【图论,贪心】

正题 题目链接:https://www.luogu.org/problemnew/show/P3441 题目大意 求III条路径最多可以覆盖树上多少个点。 解题思路 我们先只考虑叶子节点&#xff0c;显然可以覆盖min{num叶,I∗2}min\{num_叶,I*2\}min{num叶​,I∗2}。 然后网上递推&#xff0c;发现依旧是min{numi,…

ssm创建一个查询接口

注解&#xff1a; controller Autowiredprivate UserService userService;service实体类 Service("userService")Autowiredprivate UserMapper userMapper;mapper Repositorycontroller 接收数据 > service 逻辑中转 > dao 数据库查询 > domain bean类映…

Spring Boot 数据库连接池入门

转载自 芋道 Spring Boot 数据库连接池入门 本文在提供完整代码示例&#xff0c;可见 https://github.com/YunaiV/SpringBoot-Labs 的 lab-19 目录。 原创不易&#xff0c;给点个 Star 嘿&#xff0c;一起冲鸭&#xff01; 1. 概述 在我们的项目中&#xff0c;数据库连接池基…

.net core 实现简单爬虫—抓取博客园的博文列表

一.介绍一个Http请求框架HttpCode.CoreHttpCode.Core 源自于HttpCode&#xff08;传送门&#xff09;&#xff0c;不同的是 HttpCode.Core是基于.net standard 2.0实现的&#xff0c;移除了HttpCode与windows相耦合的api&#xff0c;且修改了异步实现&#xff0c;其余特性完全与…