目录
- ★ RPC模型(远程过程调用通信模型)
- ▲ 完整过程:
- 代码演示
- 总体流程解释:
- ConstantUtil 常量工具类
- ConnectionUtil RabbitMQ连接工具类
- Server 服务端
- Client 客户端
- 测试结果
- 服务端
- 客户端
- 完整代码
- ConstantUtil 常量工具类
- ConnectionUtil RabbitMQ连接工具类
- Server 服务端
- Client 客户端
- pom.xml
★ RPC模型(远程过程调用通信模型)
PRC 模型相当于一对一,跟调用方法一样,能拿到方法返回的结果,这就是典型的RPC模型(不仅要传参数,还需要拿到返回值)
reply:回答、答复
correlation :关联、相互关联
▲ 通过使用两个独享队列,可以让RabbitMQ实现RPC(远程过程调用)通信模型,
其通信过程其实也很简单:客户端向服务器消费的独享队列发送一条消息,服务器收到该消息后,对该消息进行处理,然后将处理结果发送给客户端消费的独享队列。
▲ 服务器端消费的独享队列负责保存调用参数,客户端消费的独享队列负责保存调用的返回值。
▲ 使用独享队列可以避免其他连接来读取队列的消息、只有当前连接才能读取该队列的消息,这样才能保证服务器能读到客户端发送的每条消息,客户端也能读到服务器返回的每条消息。
▲ 为了让服务器知道客户端所消费的独享队列,客户端发送消息时,应该将自己监听的队列名以 reply_to属性 发送给服务器
▲为了能准确识别服务器应答消息(返回值)与客户端请求消息(调用参数)之间的对应关系,
还需要为每条消息都增加一个 correlation_id 属性,两条具有相同 correlation_id 属性值的消息可认为是配对的两条消息。
【备注】:客户端送出的消息要包含2个属性:
-
reply_to:该属性指定了服务器要将返回的消息送回到哪个队列。
-
correlation_id:该属性指定了服务器返回的消息也要添加相同的correlation_id属性。
▲ 完整过程:
(1)服务器启动时,它会创建一个名为“rpc_queue”的独享队列(名称可以随意),并使用服务器端的消费者监听该独享队列的消息。(所有的RPC 调用,一定都是先从服务器端的启动开始的。)(2)客户端启动时,它会创建一个匿名(默认)(由RabbitMQ命名)的 独享队列,并使用客户端的消费者监听该独享队列的消息。(这个独享队列的名字也是 reply_to 属性的属性值)(3)客户端发送带有两个属性的消息:一个是代表应答队列名的 reply_to属性(该属性值就是第2步客户端所创建的独享队列名),另一个是代表消息标识的 correlation_id 属性。(4)将消息发送到服务器监听的rpc_queue队列中。(5)服务器从rpc_queue队列中读取消息,服务器调用处理程序对该消息进行计算,将计算结果以消息发送给 reply_to属性 指定的队列,并为消息添加相同的 correlation_id属性。(6)客户端从 reply_to 对应的队列中读取消息,当消息出现时,它会检查消息的 correlation_id属性。如果此属性的值与请求消息的 correlation_id 属性值匹配,将它返回给应用。————上面过程,其实就是对P2P模型的应用,因此无需使用自己的Exchange,而是使用系统自动创建的默认Exchange即可。
代码演示
需求:客户端发送个消息到服务端,服务端处理完再返回结果给客户端。
如图:需要有两个消息队列,一个是服务端
总体流程解释:
(仅作为自己梳理代码流程的记录,大佬请直接忽略)
Server 类是服务端 , Client 是客户端。
rpc_queue 是自己在服务端声明创建的消息队列,服务端监听着这个消息队列
amq.gen-3Nl6GNjR5BzPJ4N-By4p1g 是客户端声明创建的一个默认生成的消息队列。
(就是调用 Channel 的 queueDeclare() 方法声明队列时,不指定具体的消息队列的参数,全凭默认生成),客户端监听着这个默认的消息队列。
replyTo 的值是 amq.gen-3xxx 这个默认消息队列,作用是指定了服务器要将返回的消息送回到这个默认队列
correlationId 只是一个单纯的消息标识,可以给个1、2、3、4…作为消息标识
上面这些就是涉及到的一些点,下面就是流程:
首先,客户端会发送几个消息,Exchange时默认的,路由key 是 rpc_queue , 每个消息都携带者 replyTo 和 correlationId 这两个属性值;
(解释:如果消息发布者指定默认的Exchange,那么Exchange就会根据消息发布者发来的消息中携带的路由key(假如路由key叫 aaa) ,去找是否有同样名字叫aaa的消息队列,有的话就把消息分发给消息队列,没有的话该消息就会被丢弃);
这些消息会被默认的Exchange分发给 rpc_queue 这个消息队列。
服务端在声明 rpc_queue 这个消息队列的时候,把这个消息队列设置为独享类型(exclusive:true),那么 rpc_queue 这个消息队列里面的消息就只能被这个服务端消费,不能被其他消费者获取到消息。
因为服务端监听这个 rpc_queue 这个消息队列,所以服务端拿到这个消息队列的消息之后,就会把消息中的 replyTo 和 correlationId 先拿出来,然后同时对消息进行业务逻辑处理。
业务逻辑处理完消息后,服务端需要把这些处理后的消息返回给客户端。
重点就是,每个消息在客户端发来之后,都有一个 correlationId 标识,所以服务端在返回回去时,需要把处理好的消息的原本的correlationId 标识对应的设置回去。
(比如:客户端发来消息 A , A 携带的 correlationId 为 1 ,那么服务端在处理完 A 消息后,需要把 correlationId = 1 再设置回 这个消息 A (就是拿出来,处理完消息,再放回去),这样客户端在接收服务端返回来的处理过后的A消息时,才能根据 correlationId = 1 这个标识,得到想要的被处理过的A消息数据。
因为客户端发的消息可能有成千上万条,需要有这个 correlationId 作为消息的标识,才能准确拿到被处理过的想要的那条A消息)
服务端返回处理过的消息给客户端,也是一个发送消息的过程,所以发送消息指定的消息队列就是这个 replyTo(就是amq.gen-3xxx 这个默认消息队列),这个replyTo 也是从客户端发送来的消息中获取获取一个属性,作用是指定了服务端要将返回的消息送回到这个默认队列。
服务端把处理后的消息返回到 amq.gen-3xxx 这个默认消息队列,因为 客户端就是在监听amq.gen-3xxx 这个默认消息队列,所以客户端就能得到自己一开始发送给服务端,然后服务端处理完成后返回来的消息。
然后客户端就能根据 correlationId 这个标识,准确找到每个被处理修改过后的消息,而不至于找混。再根据需求去对处理过的消息进行业务操作。
(以上仅作为自己梳理代码流程的记录,大佬请直接忽略)
更简单点来说,就是
客户端声明并监听着默认队列 amq.gen-3xxx,然后发送消息到客户端,消息携带有correlationId 和 replyto 两个属性,路由key是rpc_queue,exchange是默认的。
服务端声明并监听 rpc_queue 消息队列,从该队列得到消息(messageA)后,从每个消息中获取该消息对应的 correlationId 和 replyto 两个属性的属性值,然后处理消息,对于处理完的消息(resultMessageA),需要把correlationId 和 replyto 两个属性的属性值重新设置回给resultMessageA,在通过Exchange分发回给 replyto 属性值中指定的消息队列(amq.gen-3xxx)。
然后客户端再从 amq.gen-3xxx 默认的消息队列中获取服务端处理并返回回来的消息,进行对应的消费。
ConstantUtil 常量工具类
ConnectionUtil RabbitMQ连接工具类
Server 服务端
Client 客户端
测试结果
启动测试的时候,一定要先启动服务端,再启动客户端
服务端
客户端
QUEUE
完整代码
ConstantUtil 常量工具类
package cn.ljh.rabbitmq.util;//常量
public class ConstantUtil
{//消息队列实现 RPC(远程过程调用)模型 之 服务器端----------------//消息队列public final static String RPC_QUEUE = "rpc_queue";// ------------topic类型的Exchange,需要的相关常量----------public final static String QUEUET01 = "qt_01";public final static String QUEUET02 = "qt_02";// topic 通配符类型的 Exchangepublic static final String EXCHANGE_NAME_TOPIC = "myex03.topic";// Exchange 绑定 Queue 队列的路由key ,通配符类型 *:匹配一个单词。#:匹配零个或多个单词。public static final String[] ROUTING_TOPIC_PATTERNS = {"*.crazyit.*", "*.org", "edu.#"};// 生产者发送消息给Excahnge携带的路由keypublic static final String[] ROUTING_TOPIC_KEYS = { "www.crazyit.org", "www.crazyit.cn","edu.crazyit.org", "crazyit.org", "fkjava.org", "edu.fkjava.org", "edu.fkjava", "edu.org"};//-------------------------------------------------------// 消息队列的名称public final static String QUEUE01 = "queue_01";public final static String QUEUE02 = "queue_02";// Exchange的名称public static final String EXCHANGE_NAME = "myex02.direct";// 三个路由key定义成一个数组的名称public static final String[] ROUTING_KEYS = {"info", "error", "warning"};}
ConnectionUtil RabbitMQ连接工具类
package cn.ljh.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory = new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}
Server 服务端
package cn.ljh.rabbitmq.producer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.TimeoutException;//消息队列实现 RPC(远程过程调用)模型 之 服务器端
public class Server
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、使用系统自动创建的默认Exchange,无需声明Exchange//消息队列设置为独占(exclusive:true)--------------------------------------------------------------------------------------声明消息队列channel.queueDeclare(ConstantUtil.RPC_QUEUE,true, /* 是否持久化 */true, /* 是否只允许只有这个消息队列的消息消费者才可以消费这个消息队列的消息 */false, /* 是否自动删除 */null); /* 指定这个消息队列的额外参数属性 *///不需要关闭资源,因为它也要监听自己消费消息的队列//4、调用Channel 的 basicConsume()方法开始消费消息----------------------------------------------------------------------------1、服务端监听并消费消息channel.basicConsume(ConstantUtil.RPC_QUEUE, /* 消费这个名字的消费队列里面的消息 */true,new DefaultConsumer(channel){//处理消息:当这个 ConstantUtil.RPC_QUEUE 消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /* 消息的那些属性 */,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来,此处读取到的消息,就相当于调用参数-------------------------------------------------------2、服务端获取消息队列的消息String param = new String(body, StandardCharsets.UTF_8);//之前只需要用到消息,现在需要额外读取消息里面携带的两个属性:reply_to 和 correlation_id//消息的属性都存放在 AMQP.BasicProperties 这个属性里面,从这个属性获取 reply_to 和 correlation_idString replyTo = properties.getReplyTo();System.err.println("replyTo: " + replyTo);String correlationId = properties.getCorrelationId();System.err.println("correlationId: " + correlationId);//调用服务器的处理消息的方法,最终得到处理后的结果。该方法可以是任意的业务处理,该方法的返回值result是要被送回客户端的。------3、服务端处理消费消息String result = format(param);//printf:格式化输出函数 %s:输出字符串 %n:换行System.err.printf("服务端 收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(), envelope.getRoutingKey(), param);//发送消息的方法,需要把返回值result发送回客户端-------------------------------------------------------4、服务端处理消费完的消息返回客户端的操作channel.basicPublish("", /* 使用默认的Exchange */replyTo,/* 此处的routing key 应该填 reply_to 属性; reply_to: 该属性指定了服务器要将返回的消息送回到哪个队列 *///把从客户端的 AMQP.BasicProperties 属性获取到的correlationId,再作为参数传回去,用于客户端和服务器的匹配。new AMQP.BasicProperties().builder().correlationId(correlationId) /* 也需要返回额外的 correlation_id,要与从客户端消息中读取的 correlation_id 完全一样 */.deliveryMode(2) /* 设置这个消息是持久化类型的 */.build(), /*这个.build()的作用就是构建得到这个 BasicProperties 对象,这个对象就包含了 correlationId 属性因为服务器端返回的消息一定要有这个correlationId。 */result.getBytes(StandardCharsets.UTF_8));}});}//模拟服务器端消费消息要做的处理业务逻辑操作public static String format(String name){//此处模拟让服务器处理这里的业务有快有慢的情况,看correlation_id 能不能还是把数据对应上int rand = (new Random().nextInt(40) + 20) * 30;try{Thread.sleep(rand);} catch (InterruptedException e){e.printStackTrace();}return "《" + name + "》";}}
Client 客户端
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;//消息队列实现 RPC(远程过程调用)模型 之 客户端
public class Client
{// paramMap 保存了 correlationid 与 参数(消息)之间的对应关系public static Map<String, String> paramMap = new ConcurrentHashMap<>();//客户端发送的消息(参数)public static String[] params = new String[]{"火影忍者", "七龙珠", "哆啦A梦", "蜡笔小新"};public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,声明一个有 RabbitMQ 自动创建的、自动命名的、持久化的、独享的、会自动删除的【默认队列】AMQP.Queue.DeclareOk declareOk = channel.queueDeclare();System.out.println("declareOk "+declareOk);//.getQueue() 用于得到默认队列的返回值,也就是默认队列的名字,之前声明是我们自己设置队列名,这里用默认的队列,就用.getQueue() 得到队列名。String queueName = declareOk.getQueue();System.out.println("queueName: "+queueName);//4、调用Channel 的 basicConsume()方法开始处理消费消息-----------------------------------------------------------------2、客户端监听服务端处理完消息后返回来的消息channel.basicConsume(queueName /*消费这个消费队列里面的消息*/,true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String resultMessage = new String(body, StandardCharsets.UTF_8);//此处,需要指定每个返回值对应的是哪个参数,靠的就是correlation_idString correlationId = properties.getCorrelationId();//根据服务器端返回的消息中的correlation_id 获取对应的参数String param = paramMap.get(correlationId);System.err.println("客户端发出去的消息内容:"+param +" , 服务端处理后返回来的消息内容:"+resultMessage);//printf:格式化输出函数 %s:输出字符串 %n:换行System.out.printf("客户端 收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(), envelope.getRoutingKey(), resultMessage);//得到服务器的返回值之后,整个调用过程就完成了,此时就应该从 Map 中删除这组 key-value对了( correlationId 与 参数的对应关系 )。paramMap.remove(correlationId);}});//客户端发送消息---------------------------------------------------------------------------代码运行后先执行这段--------------------1、客户端发送消息for (int i = 0 ; i < params.length ; i++){paramMap.put( i + "" , params[i] );channel.basicPublish("", /* 使用默认的Exchange */ConstantUtil.RPC_QUEUE, /* 客户端发送消息携带的路由key是服务端监听的消息队列的名字,且使用了默认的Exchange,这就意味着消息会被发送给服务器监听的那个消息队列 */new AMQP.BasicProperties().builder().correlationId(i + "") /* 设置 correlation_id 属性; correlation_id:该属性指定了服务器返回的消息也要添加相同的correlation_id属性*/.replyTo(queueName) /* reply_to: 该属性指定了服务器要将返回的消息送回到哪个队列 , 设置 reply_to 属性 */.deliveryMode(2) /* 持久化消息 */.build(), /* 构建这个BasicProperties对象,这个对象主要存这个correlationId属性 */params[i].getBytes(StandardCharsets.UTF_8));}}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>rabbitmq_rpc</artifactId><version>1.0.0</version><name>rabbitmq_rpc</name><!-- 属性 --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!-- 依赖 --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies></project>