基于消息中间件RabbitMQ实现简单的RPC服务

转载自  基于消息中间件RabbitMQ实现简单的RPC服务

RPC(Remote Procedure Call,远程过程调用),是一种计算机通信协议。对于两台机器而言,就是A服务器上的应用程序调用B服务器上的函数或者方法,由于不在同一个内存空间或机器上运行,因此需要借助于网络通信。

1. RPC框架

我们首先通过一张图理解RPC的工作流程:

因此,实现一个最简单的RPC服务,只需要Client、Server和Network,本文就是利用消息中间件RabbitMQ作为Network载体传输信息,实现简单的RPC服务。简单原理可如下图所示:

即:当Client发送RPC请求时,Client端是消息生产者,Server端是消息消费者;当Server返回结果时,Server端是消息生产者,Client是消息消费者;发送和返回使用不同的队列。

接下来我们通过代码,详细展示一个计算斐波那契数列的RPC服务。

2. RPCServer实现

2.1 Server初始化

/*** 队列名、交换机名、路由键*/
private static final String EXCHANGE_NAME = "rpc_exchange";
private static final String QUEUE_NAME = "request_rpc_queue";
private static final String ROUTING_KEY = "rpc_routing_key";private Connection connection = null;
private Channel channel = null;
private QueueingConsumer consumer = null;/*** Server的构造函数*/
private RPCServer() {try {//创建链接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Config.HOST);factory.setPort(Config.PORT);factory.setUsername(Config.USER);factory.setPassword(Config.PASSWORD);connection = factory.newConnection();//创建信道channel = connection.createChannel();//设置AMQP的通信结构channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);//设置消费者consumer = new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, false, QUEUE_NAME, consumer);} catch (Exception e) {LOG.error("build connection failed!", e);}
}

初始化就是声明RabbitMQ的链接工厂、链接、信道、队列、交换机等等,并做了绑定,由此构成了AMQP的通信结构。

2.2 监听队列并反馈

/*** 开启server*/
private void startServer() {try {LOG.info("Waiting for RPC calls.....");while (true) {//获得文本消息QueueingConsumer.Delivery delivery = consumer.nextDelivery();BasicProperties props = delivery.getProperties();//返回消息的属性BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();long receiveTime = System.currentTimeMillis();JSONObject json = new JSONObject();try {String message = new String(delivery.getBody(), "UTF-8");int n = Integer.parseInt(message);LOG.info("Got a request: fib(" + message + ")");json.put("status", "success");json.put("result", fib(n));} catch (Exception e) {json.put("status", "fail");json.put("reason", "Not a Number!");LOG.error("receive message failed!", e);} finally {long responseTime = System.currentTimeMillis();json.put("calculateTime", (responseTime - receiveTime));channel.basicPublish("", props.getReplyTo(), replyProps, json.toString().getBytes("UTF-8"));channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}} catch  (Exception e) {LOG.error("server failed!", e);} finally {if (connection != null) {try {connection.close();} catch (Exception e) {LOG.error("close failed!", e);}}}
}

在该方法中使用了一个无限循环,每次处理一条消息。通过调用消费者对象的nextDelivery方法来获得RabbitMQ队列的最新一条消息。同时通过getProperties获取到消息中的反馈信息属性,用于标记客户端Client的属性。然后计算斐波那契数列的结果。最后通过basicAck使用消息信封向RabbitMQ确认了该消息。

到这里就实现了计算斐波那契数列RPC服务的Server端。

3. RPCClient实现

3.1 初始化CLient

/*** 消息请求的队列名、交换机名、路由键*/
private static final String EXCHANGE_NAME = "rpc_exchange";
private static final String QUEUE_NAME = "request_rpc_queue";
private static final String ROUTING_KEY = "rpc_routing_key";/*** 消息返回的队列名、交换机名、路由键*/
private static final String RESPONSE_QUEUE = "response_rpc_queue";
private static final String RESPONSE_ROUTING_KEY = "response_rpc_routing_key";/*** RabbitMQ的实体*/
private Connection connection = null;
private Channel channel = null;
private QueueingConsumer consumer = null;/*** 构造客户端* @throws Exception*/
private RPCClient() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Config.HOST);factory.setPort(Config.PORT);factory.setUsername(Config.USER);factory.setPassword(Config.PASSWORD);connection = factory.newConnection();channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);channel.queueBind(RESPONSE_QUEUE, EXCHANGE_NAME, RESPONSE_ROUTING_KEY);consumer = new QueueingConsumer(channel);channel.basicConsume(RESPONSE_QUEUE, true, consumer);
}

这里声明AMQP结构体的方式和Server端类似,只不过Client端需要多声明一个队列,用于RPC的response。

3.2 发送/接收消息

/*** 请求server* @param message* @return* @throws Exception*/
private String requestMessage(String message) throws Exception {String response = null;String corrId = UUID.randomUUID().toString();BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(RESPONSE_QUEUE).build();channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();if (delivery.getProperties().getCorrelationId().equals(corrId)) {response = new String(delivery.getBody(),"UTF-8");break;}}return response;
}

BasicProperties用于存储你请求消息的属性,这里我设置了correlationId和replyTo属性,用于Server端的返回识别。

4. 运行测试

Client端发送:

Server端接收并处理:

Client收到计算结果:

由于我运行RabbitMQ的服务器是租用的阿里云的,差不多传输时延在60ms左右,如果把RPC服务和消息中间件同机房部署的话延时基本上就在ms级别。

5. FAQ

5.1 说明

需要体验完整的过程,你需要如下环境:

JDK1.6以上 + Maven + RabbitMQ

5.2 源代码

完整代码代码请戳:github

其中Server的代码在:

rpc.RPCServer

Client端的代码位置:

rpc.RPCClient

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322609.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开源纯C#工控网关+组态软件(七)数据采集与归档

一、 引子在当前自动化、信息化、智能化的时代背景下,数据的作用日渐凸显。而工业发展到如今,科技含量和自动化水平均显著提高,但对数据的采集、利用才开始起步。对工业企业而言,数据采集日益受到重视,主要应用场景包…

实践出真知之Spring Cloud之基于Eureka、Ribbon、Feign的真实案例

转载自 实践出真知之Spring Cloud之基于Eureka、Ribbon、Feign的真实案例 Eureka是Spring Cloud Eureka的简称,是Netflix提供的组件之一。通过Eureka可以提供服务注册、发现、负载均衡、降级、熔断等功能。本篇主要介绍Eureka作为服务注册中心,以及实现…

从零开发一个laravel项目的增删改查、详情

环境要求: wampcomposer 创建laravel项目: composer create-project --prefer-dist laravel/laravel person快速完成person注册登录开发 1、migration php artisan make:migration create_people_table$table->increments(id);$table->string…

使用Api分析器与Windows兼容包来编写智能的跨平台.NET Core应用

本文翻译自Scott Hanselman博客:https://www.hanselman.com/blog/WritingSmarterCrossplatformNETCoreAppsWithTheAPIAnalyzerAndWindowsCompatibilityPack.aspx正文:这是最近这几周你应该知道的一对.Net Core界的优秀工具。我们在编写或者移植跨平台代码…

实验进行中:.NET WebAssembly支持

目前四大主流浏览器都默认支持WebAssembly,而.NET社区也在继续推动为.NET开发者提供相关能力,来将他们的代码编译成WebAssembly,然后在浏览器上运行。WebAssembly是一种二进制web格式,旨在以接近原生的性能运行不是用JavaScript语…

Js对象如何添加方法、查看Api

js万物皆对象,要带着观察对象的眼观去看待每一个函数、变量。 为什么要用到原型? Es6以前,js中没有如ooa编程当中的class,但是要用到类,怎么办呢,构造函数就应运而生,但是构造函数里面添加方法…

微软人工智能和对话平台--知识商城体验

前言微软最新发布 知识商城了!这是一个人工智能和对话平台应用的场景。他可以让开发者带着想法 出做天马行空的创造性工作!你只需要稍微动动手,如:拖拽板块,就可以做到极致对答、代码自动生成!想象一下&…

Spring @Import注解配置类方法内部调用没有注入属性值的坑

一、场景复现 application.yaml spring:application:name: config-testprofiles:active: devconfig:config-01:name: zhansancode: 001config-02:name: lisicode: 002导入配置类 Configuration Import(ImportConfig.class) public class Config {BeanConfigurationPropertie…

使用Xamarin开发手机聊天程序 -- 基础篇(大量图文讲解 step by step,附源码下载)

如果是.NET开发人员,想学习手机应用开发(Android和iOS),Xamarin 无疑是最好的选择,编写一次,即可发布到Android和iOS平台,真是利器中的利器啊!而且,Xamarin已经被微软收购…

Spring Boot 数据库连接池入门

转载自 芋道 Spring Boot 数据库连接池入门 本文在提供完整代码示例,可见 https://github.com/YunaiV/SpringBoot-Labs 的 lab-19 目录。 原创不易,给点个 Star 嘿,一起冲鸭! 1. 概述 在我们的项目中,数据库连接池基…

.net core 实现简单爬虫—抓取博客园的博文列表

一.介绍一个Http请求框架HttpCode.CoreHttpCode.Core 源自于HttpCode(传送门),不同的是 HttpCode.Core是基于.net standard 2.0实现的,移除了HttpCode与windows相耦合的api,且修改了异步实现,其余特性完全与…

SpringBoot2.1.9 Mybatis由于@Mapper注解多数据源配置不生效问题

一、场景复现 (1)项目 目录 配置文件 spring:application:name: multi-datasourceprofiles:active: dev1datasource:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/base?…

使用Windows兼容包简化向.NET Core的迁移

从.NET迁移到.NET Core的一个主要原因,在于后者具备在Linux上运行的能力。但是对于大型企业应用,不可能实现一步迁移到位。由此,Microsoft推荐采用一种逐步迁移做法:第一步,迁移到ASP.NET Core(依然使用.NE…

springboot使用xml配置mybatis

前面用注解配置了mybatis&#xff0c;非常的简单&#xff0c;但是在写动态sql语句的时候会非常的麻烦&#xff0c;所以这边我们用xml来重新配置一下 在resource目录下新建 SqlMapConfig.xml 主配置文件 <?xml version"1.0" encoding"UTF-8" ?> &…

SpringBoot2.1.9 Mybatis多数据源配置

一、配置文件 目录 application.yaml spring:application:name: multi-datasourceprofiles:active: devdatasource:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/base?autoReconnecttrue&ze…

自动类型安全的REST .NET标准库refit

在SCOTT HANSELMAN 博客上看到一个好东西《Exploring refit, an automatic type-safe REST library for .NET Standard》&#xff0c;他推荐了一个.NET标准1.4 的自动类型安全的REST库refit。 refit 类似于Java的Retrofit&#xff0c;是一套RESTful架构的.NET客户端实现&#x…

Visual Studio的语言服务器协议

语言服务器协议&#xff08;LSP&#xff09;是Visual Studio Code的一个重要组件。语言服务器实际上是单独运行的编译器或分析器&#xff0c;它负责处理各种任务&#xff0c;如编译器错误报告、文本悬浮、代码自动完成&#xff08;也就是IntelliSense&#xff09;等。语言服务器…

C#和NewSQL更配 —— TiDB入门

一、背景在上一篇尝试CockroachDB&#xff08;传送门在此&#xff1a;http://www.cnblogs.com/Zachary-Fan/p/cockroachdb_net_csharp.html&#xff09;的过程中&#xff0c;发现如果从常规的RDBMS迁移过去几乎是不太可能的事情&#xff0c;所以掉头开始调研一下也是这2年比较火…

Mysql调优你不知道这几点,就太可惜了

转载自 Mysql调优你不知道这几点&#xff0c;就太可惜了 一、Mysql的逻辑分层 Mysql分为&#xff1a;连接层、服务层、引擎层、存储层。 当客户端向服务端发起操作请求的时候&#xff0c;执行过程是这样的&#xff1a; 1、客户端端与Mysql服务端的连接层建立连接&#xff…

Unity/DotNetty中集成Lidgren实现可靠UDP

lidgren有几个优点&#xff1a;分channel&#xff0c;每个channel都有单独的消息队列&#xff0c;不互相影响。每个消息可以单独选择使用可靠/不可靠传输。支持内网穿透自带加密算法。前端Unity&#xff1a;先贴一张前端使用的网络框架图&#xff1a;Lidgren的Github地址&#…