RabbitMQ学习整理————基于RabbitMQ实现RPC

基于RabbitMQ实现RPC

  • 前言
  • 什么是RPC
  • RabbitMQ如何实现RPC
  • RPC简单示例
  • 通过Spring AMQP实现RPC

前言

这边参考了RabbitMQ的官网,想整理一篇关于RabbitMQ实现RPC调用的博客,打算把两种实现RPC调用的都整理一下,一个是使用官方提供的一个Java client,还有一个是Spring AMQP的整合使用。
代码路径:https://github.com/yzh19961031/blogDemo/tree/master/rabbitmq

什么是RPC

RPC是远程过程调用(Remote Procedure Call)的缩写形式,简单说就是一个节点去请求另一个节点上面的服务并获得响应结果。
我们之前总结的工作模式都是发送消息到指定的队列,再由相关的消费者进行消费,如果存在这样的场景,比如消费者消费完消息需给生产者一个具体的响应,然后生产者再根据这个响应进行其他的业务逻辑,这样就需要使用到RabbitMQ提供的RPC能力。

RabbitMQ如何实现RPC

官方有很详细的介绍文档,这边贴一下地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
RabbitMQ实现RPC很简单,正常的流程就是请求以及响应,我们只需要在请求的消息的属性里面添加一个响应队列的地址,这边需要使用到一个BasicProperties这个类。具体配置如下:

// 指定一个回调队列
callbackQueueName = channel.queueDeclare().getQueue();
// 设置replyTo的属性为指定的回调队列
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();channel.basicPublish("", "rpc_queue", props, message.getBytes());

BasicProperties这个类中提供了很多的属性,有14个,很多基本上很少用到,常用的就是几个,我这边也贴一下,其实在我上一篇文章中基于RabbitMQ实现的一个RPC工具里面都有用到这些属性。

  1. contentType 这个属性用来表明消息的类型,默认是"application/octet-stream"这种流的类型,还有常用的比如"application/json","text/plain"等,这些在我的RPC工具里面都有用到。
  2. replyTo 这个就是上面指定的回调队列。
  3. correlationId 这个id可以用来进行消息的确认,将相应与请求相关联。主要是可以确认服务端收到的消息是不是指定客户端发过来的,用于确认。

首先先贴一张官方提供的图,这个是RabbitMQ实现RPC的主要工作流程:
在这里插入图片描述
实现RPC的具体工作流程:

  1. 首先客户端发送一个请求消息,这个请求消息里面有两个属性,一个是replyTo回调队列的地址,一个是correlationId用于标识当前消息唯一的id信息。
  2. 这个消息是发送到指定的rpc_queue这个队列上面。
  3. 对应我们的服务端Server就会等待rpc_queue上面的请求消息,当请求消息来得时候,服务端会进行处理,处理完成会将相应的消息再发送到请求消息属性中的replyTo回调的队列上面。
  4. 客户端发送消息之后,会等待replyTo队列中的消息。当有消息来得时候,会检查响应消息中correlationId属性和请求消息中correlationId是否一致,完成一次PRC调用。

RPC简单示例

我这边根据官网上面提供的例子简单修改整理了一下,这边提供一个大小写转换的功能,就是客户端发送一段小写的字符串,服务端将字符串转为大写再响应过来。详细逻辑可以看下代码中注释,具体代码如下:
首先服务端:

/*** RPC服务端** @author yuanzhihao* @since 2020/11/21*/
public class RPCServer {public static void main(String[] args) throws IOException, TimeoutException {// 首先还是正常获得connection以及channel对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.1.108");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 定义一个rpc的队列String queueName = "test_rpc";channel.queueDeclare(queueName, false, false, false, null);Object monitor = new Object();// 具体的消费代码里面实现DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消费者将请求消息中的correlationId信息再作为响应传回replyTo队列AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response = "";try {// 提供一个大小写转换的方法String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("toUpperCase(" + message + ")");response = toUpperCase(message);} catch (RuntimeException e) {System.out.println(e.toString());} finally {// 将响应传回replyTo队列channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));// 设置了手动应答 需要手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 执行完成会释放主线程的锁// RabbitMq consumer worker thread notifies the RPC server owner threadsynchronized (monitor) {monitor.notify();}}};// 监听"test_rpc"队列channel.basicConsume(queueName, false, deliverCallback, (consumerTag -> { }));// 这个锁对象是确保我们server的调用逻辑执行完成 首先挂起主线程// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (monitor) {try {monitor.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 提供一个大小写转换的方法private static String toUpperCase(String msg) {return msg.toUpperCase();}
}

客户端:

/*** RPC客户端** @author yuanzhihao* @since 2020/11/21*/public class RPCClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 创建connection以及channel对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.1.108");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");try ( Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel()) {// 声明一个队列String queueName = "test_rpc";// 请求消息中需要带一个唯一标识ID String corrId = UUID.randomUUID().toString();// 声明一个回调队列String replayQueueName = channel.queueDeclare().getQueue();// 将correlationId以及回调队列设置在消息的属性中AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replayQueueName).build();// 具体消息内容String msg = "hello rpc";// 发送请求消息channel.basicPublish("",queueName,properties,msg.getBytes());// 设置一个阻塞队列  等待服务端的响应final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, message) -> {// 注意 这边根据correlationId进行下判断if (message.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(message.getBody(), StandardCharsets.UTF_8));}}, consumerTag -> {});// 获取响应结果String take = response.take();System.out.println("rpc result is "+ take);channel.basicCancel(ctag);}}
}

执行代码,具体的客户端与服务端运行结果在这里插入图片描述 在这里插入图片描述

通过Spring AMQP实现RPC

通过Spring来实现RPC也很简单,主要通过spring提供的一个RabbitTemplate对象中sendAndReceive方法来实现,这个方法是发送消息然后一直等待响应。监听器里面实现的和之前的逻辑大致相同,都需要将response响应消息发送到对应的replyTo回调队列上。下面直接贴一下代码。
首先是服务端,我这边直接是使用配置类的形式,具体一些的配置项可以参考下我之前的那篇博客或者上网搜一下~

/*** 主配置类** @author yuanzhihao* @since 2021/1/9*/
@Configuration
public class RabbitMQConfig {private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);// 注入connectionFactory对象@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses("192.168.1.108:5672");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");return connectionFactory;}// 声明队列@Beanpublic Queue rpcQueue() {return new Queue("test_rpc",false);}@Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}// 创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}// 消息监听器@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());// 监听的队列container.setQueues(rpcQueue());MessageListener messageListener = message -> {String receiveMsg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("Receive a message message is {}", receiveMsg);// 执行对应逻辑String responseMsg = toUpperCase(receiveMsg);MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(message.getMessageProperties().getCorrelationId()).build();// 响应消息 这边就是如果没有绑定交换机和队列的话 消息应该直接传到对应的队列上面rabbitTemplate.send("", message.getMessageProperties().getReplyTo(), new Message(responseMsg.getBytes(StandardCharsets.UTF_8), messageProperties));};// 设置监听器container.setMessageListener(messageListener);return container;}// 提供一个大小写转换的方法private String toUpperCase(String msg) {return msg.toUpperCase();}
}

客户端我采用test单元测试的形式

/*** spring amqp rpc 测试类** @author yuanzhihao* @since 2021/1/9*/
@ContextConfiguration(classes = {RabbitMQConfig.class})
@RunWith(SpringRunner.class)
public class RabbitMQRpcTest {private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);@Autowiredprivate RabbitTemplate rabbitTemplate;// 测试RPC客户端@Testpublic void testRpcClient() {// 设置correlationIdString corrId = UUID.randomUUID().toString();String msg = "hello rpc";MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();// 注意 这边如果使用sendAndReceive不指定replyTo回调队列 spring会默认帮我们添加一个回调队列// 格式默认 "amq.rabbitmq.reply-to" 前缀Message message = rabbitTemplate.sendAndReceive("", "test_rpc", new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties));log.info("The response is {}", new String(message.getBody(), StandardCharsets.UTF_8));}
}

具体实现可以看下代码的注释
代码执行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/698175.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

思维模型整合

思维模型整合 4P--- 4C思考模型能力圈模型 4P— 4C思考模型 在竞争激烈的今天&#xff0c;每个赛道都有众多可以为客户提供相同价值的对手&#xff0c;而赛道中的佼佼者之所以能打败大部分人&#xff0c;可能并不是他们能比别人更能讨好大众&#xff0c;而是因为在这个赛道它有…

Jmeter学习系列之六:阶梯加压线程组Stepping Thread Group详解

性能测试中,有时需要模拟一种实际生产中经常出现的情况,即:从某个值开始不断增加压力,直至达到某个值,然后持续运行一段时间。 在jmeter中,有这样一个插件,可以帮我们实现这个功能,这个插件就是:Stepping Thread Group 1、下载配置方法 1.1.下载配置 插件下载地址:…

Selenium定位不到元素怎么办?一定要这么做

在使用Selenium进行自动化测试时&#xff0c;碰到无法定位元素该怎么办&#xff1f;这里总结了9种情况下的元素定位方法&#xff1a; 1、frame/iframe表单嵌套 WebDriver只能在一个页面上对元素识别与定位&#xff0c;对于frame/iframe表单内嵌的页面元素无法直接定位。 解决…

Jenkins持续集成Python项目

一、前言   之前学习了很多自动化测试框架&#xff0c;但是写的脚本都是本地执行&#xff0c;多数用来造数据。最近公司掀起一股自动化测试的风&#xff0c;所以就想研究下如何集成jenkins&#xff0c;本次采用pytest&#xff0c;用的是阿里云服务器centos7。 二、服务器环境…

石头剪刀布游戏(C语言)

题目描述 石头剪刀布游戏有 3 种出拳形状&#xff1a;石头、剪刀、布。分别用字母 A , B , C 表示。 游戏规则: 出拳形状之间的胜负规则如下&#xff1a; A > B&#xff1b;B > C&#xff1b;C > A&#xff1b;">"左边一个字母&#xff0c;表示相对优…

bat 查找文件所在

脚本 在批处理文件&#xff08;.bat&#xff09;中查找文件所在的目录&#xff0c;你可以使用dir命令结合循环和条件语句来实现。以下是一个简单的示例&#xff0c;演示如何在批处理文件中查找指定文件并输出其所在目录&#xff1a; echo off setlocal enabledelayedexpansio…

Vue 封装的 axios 类的使用(小bug 改进)

http类 import { baseUrl } from "./config"; //引入config.js中的配置 import axios from "axios"; //引入axios import qs from "querystringify"; //form-Data请求时的工具类class Http{axios null;lastRequestIntercept null…

开源LLMs导览:工作原理、顶级LLM列表对比

目录 一、开源 LLM 是什么意思&#xff1f;二、开源LLM如何工作&#xff1f;2.1 预训练2.2 代币化2.3 开源LLM的微调2.4 输入编码2.5 训练与优化2.6 推理 三、开源LLM对组织的好处3.1 增强的数据安全和隐私3.2 节约成本3.3 减少供应商依赖性3.4 代码透明度 四、哪种LLM模式最好…

数据可视化在商业领域有哪些重要性?

数据可视化在商业领域的重要性体现在多个方面&#xff0c;它通过将复杂的数据集转化为直观、易于理解的图形和图表&#xff0c;帮助企业和组织做出更明智的决策。以下是数据可视化对商业的一些关键重要性&#xff1a; 提高决策效率&#xff1a;通过直观的图表和图形&#xff0c…

漫漫数学之旅031

文章目录 经典格言数学习题古今评注名人小传 - 经典格言 如果没有数学知识&#xff0c;这个世界的事物是无法搞清楚的。——罗杰培根&#xff08;Roger Bacon&#xff09; 好的&#xff0c;各位看官&#xff0c;让我们来听听罗杰培根这位中世纪的“科学老顽童”是怎么说的&…

罗克韦尔AB的PLC实现ModbusTCP和ModbusRTU协议标签方式通讯

本文是通过IGT-DSER智能网关读写AB罗克韦尔Compact、Control系列PLC的标签数据缓存并转为Modbus从站协议&#xff0c;与上位机通讯的案例。 打开智能网关的参数软件(下载地址)&#xff0c;通过功能->数据转发与平台对接&#xff0c;再选择数据转发与缓存’&#xff0c;进入以…

基于java+springboot+vue实现的城市垃圾分类管理系统(文末源码+Lw)23-191

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本城市垃圾分类管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数…

基于Java+Selenium的WebUI自动化测试框架(一)---页面元素定位器

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

使用python查看官网是否发布新的内容

目录 前言 第一章、python介绍和使用pip install下载包 1.python介绍 2.使用vscode编写python 3.pip install的使用 第二章、查看官网是否发布新的内容 第三章、代码实现 目录结构 代码实现 check_new_news.py files.py news.py main.py file.txt 运行演示 前言 也…

【Azure 架构师学习笔记】- Azure Databricks (7) --Unity Catalog(UC) 基本概念和组件

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Databricks】系列。 接上文 【Azure 架构师学习笔记】- Azure Databricks (6) - 配置Unity Catalog 前言 在以前的Databricks中&#xff0c;主要由Workspace和集群、SQL Warehouse组成&#xff0c; 这两年Databricks公…

我们在SqlSugar开发框架中,用到的一些设计模式

我们在《SqlSugar开发框架》中&#xff0c;有时候都会根据一些需要引入一些设计模式&#xff0c;主要的目的是为了解决问题提供便利和代码重用等目的。而不是为用而用&#xff0c;我们的目的是解决问题&#xff0c;并在一定的场景下以水到渠成的方式处理。不过引入任何的设计模…

【YOLOX-nano】YOLOX-nano的推理实践

目录 1 pt文件和pth文件的区别 2 安装依赖库 3 模型下载 4 实践 YOLOX-nano.pth模型转换为YOLOX-nano.onnx模型</

SpringBoot对于SpringMVC的支持

创建项目 版本说明这里使用的 SpringBoot 2.0.0.Release SpringBoot对于SpringMVC的支持 在之前的开发中很多场景下使用的是基于xml配置文件或者是Java配置类的方式来进行SpringMVC的配置。一般来讲&#xff0c;初始的步骤如下所示 1、初始化SpringMVC的DispatcherServlet2、…

pytest结合Allure生成测试报告

文章目录 1.Allure配置安装2.使用基本命令报告美化1.**前置条件**2.**用例步骤****3.标题和描述****4.用例优先级**3.进阶用法allure+parametrize参数化parametrize+idsparametrize+@allure.title()4.动态化参数5.环境信息**方式一****方式二**6.用例失败截图1.Allure配置安装 …

Jmeter基础(2) 目录介绍

目录 Jmeter目录介绍bin目录docsextrasliblicensesprintable_docs Jmeter目录介绍 在学习Jmeter之前&#xff0c;需要先对工具的目录有些了解&#xff0c;也会方便后续的学习 bin目录 examplesCSV目录中有CSV样例jmeter.batwindow 启动文件jmeter.shMac/linux的启动文件jmete…