RabbitMQ的6种工作模式

RabbitMQ的6种工作模式

官方文档:

http://www.rabbitmq.com/

https://www.rabbitmq.com/getstarted.html

RabbitMQ 常见的 6 种工作模式:
在这里插入图片描述

1、simple简单模式

在这里插入图片描述

1)、消息产生后将消息放入队列。

2)、消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。

3)、存在的问题:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。

4)、应用场景:聊天(中间有一个过度的服务器)。

5)、代码实现:

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>rabbitmq-java</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency></dependencies></project>

工具类

package com.example;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {// 连接rabbitmq服务,共享一个工厂对象private static ConnectionFactory factory;static {factory=new ConnectionFactory();//设置rabbitmq属性factory.setHost("127.0.0.1");factory.setUsername("zsx242030");factory.setPassword("zsx242030");factory.setVirtualHost("/");factory.setPort(5672);}public static Connection getConnection(){Connection connection=null;try {//获取连接对象connection = factory.newConnection();} catch (Exception e) {e.printStackTrace();}return connection;}
}

消息提供者

package com.example.simple;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//通过通道创建队列,后续所有的操作都是基于channel实现(队列也可以由消费方创建)channel.queueDeclare("queue1", false, false, false, null);//向队列中发送消息channel.basicPublish("", "queue1", null, "Hello RabbitMQ!!!".getBytes());//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者

package com.example.simple;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息(消费的是队列,而不是交换机)channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者获得消息为:Hello RabbitMQ!!!

2、work工作模式(资源的竞争)

在这里插入图片描述

1)、消息产生者将消息放入队列,消费者可以有多个,消费者1,消费者2,同时监听同一个队列。消息被消费,

C1 和 C2 共同争抢当前的消息队列内容,谁先拿到谁负责消费消息。

2)、存在的问题:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关

(synchronized,与同步锁的性能不一样),保证一条消息只能被一个消费者使用。

3)、应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到

消息队列中,空闲的系统自动争抢);对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

4)、代码实现:

消息提供者

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//通过通道创建队列channel.queueDeclare("queue1", false, false, false, null);//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("", "queue1", null, ("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();// connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();// connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!10

3、publish/subscribe发布订阅(共享资源)

在这里插入图片描述

1)、X代表交换机,rabbitMQ 内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消

息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费。

Exchange 有常见以下 3 种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。

  • Direct:定向,把消息交给符合指定 routing key 的队列。

  • Topic:通配符,把消息交给符合 routing pattern (路由模式)的队列。

Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者

没有符合路由规则的队列,那么消息会丢失。

2)相关场景:邮件群发,群聊天,广播(广告)。

3)、代码实现:

消息提供者

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建
public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)// 1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("fanout_exchange", "fanout");//通过通道创建队列//channel.queueDeclare("queue1",false,false,false,null);//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("fanout_exchange", "", null, ("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("fanout_queue1", false, false, false, null);//给队列绑定交换机channel.queueBind("fanout_queue1", "fanout_exchange", "");//监听队列中的消息channel.basicConsume("fanout_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("fanout_queue2", false, false, false, null);//给队列绑定交换机channel.queueBind("fanout_queue2", "fanout_exchange", "");//监听队列中的消息channel.basicConsume("fanout_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10
消费者2获得消息为:Hello RabbitMQ!!!1
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!3
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!5
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!7
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!10

4、routing路由模式

在这里插入图片描述

1)、消息生产者将消息发送给交换机按照路由判断,路由是字符串,当前产生的消息携带路由字符,交换机根据路

由的 key,只能匹配上路由 key 对应的消息队列,对应的消费者才能消费消息。队列与交换机的绑定,不能是任意

绑定了,而是要指定一个 RoutingKey (路由 key)。消息的发送方在向 Exchange 发送消息时,也必须指定消息的

RoutingKey 。Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列

的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

2)、根据业务功能定义路由字符串。

3)、从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4)、业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可

以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误。

5)、代码实现:

消息提供者

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)// 1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("direct_exchange", "direct");//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("direct_exchange",//设置路由键,符合路由键的队列,才能拿到消息"insert",null,("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("direct_queue1", false, false, false, null);//绑定交换机(routingKey:路由键)channel.queueBind("direct_queue1", "direct_exchange", "select");channel.queueBind("direct_queue1", "direct_exchange", "insert");//监听队列中的消息channel.basicConsume("direct_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("direct_queue2", false, false, false, null);//绑定交换机(routingKey:路由键)channel.queueBind("direct_queue2", "direct_exchange", "delete");channel.queueBind("direct_queue2", "direct_exchange", "select");//监听队列中的消息channel.basicConsume("direct_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

5、topic 主题模式(路由模式的一种)

在这里插入图片描述

1)、Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型

Exchange 可以让队列在绑定 Routing key 的时候使用通配符。

2)、Routingkey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如:item.insert。

通配符规则:

# :匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.# :能够匹配item.insert.abc或者item.insert

item.* :只能匹配 item.insert

usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

#.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

3)、路由功能添加模糊匹配。

4)、消息产生者产生消息,把消息交给交换机。

5)、交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。

6)、代码实现:

消息提供者

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)   //1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("topic_exchange", "topic");//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("topic_exchange",// #:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况)   *(匹配一个单词)"emp.hello world",null,("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("topic_queue1", false, false, false, null);//绑定交换机(routingKey:路由键)  #:匹配0-n个单词(之间以.区分,两点之间算一个单词)channel.queueBind("topic_queue1", "topic_exchange", "emp.#");//监听队列中的消息channel.basicConsume("topic_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("topic_queue2", false, false, false, null);//绑定交换机(routingKey:路由键)  *:匹配1个单词(之间以.区分,两点之间算一个单词)channel.queueBind("topic_queue2", "topic_exchange", "emp.*");//监听队列中的消息channel.basicConsume("topic_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

6、RPC

在这里插入图片描述

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1)、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2)、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3)、服务端将RPC方法 的结果发送到RPC响应队列。

4)、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

5)、代码实现:

Client端

package com.example.rpc;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class Client {public static void main(String[] argv) throws IOException, InterruptedException {String message = "Hello World!!!";// 建立一个连接和一个通道,并为回调声明一个唯一的回调队列Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 定义一个临时变量的接受队列名String replyQueueName = channel.queueDeclare().getQueue();// 生成一个唯一的字符串作为回调队列的编号String corrId = UUID.randomUUID().toString();// 发送请求消息,消息使用了两个属性:replyTo和correlationId// 服务端根据replyTo返回结果,客户端根据correlationId判断响应是不是给自己的AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();// 发布一个消息,rpc_queue路由规则channel.basicPublish("", "rpc_queue", props, message.getBytes("UTF-8"));// 由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。// 这里我们创建的容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);// String basicConsume(String queue, boolean autoAck, Consumer callback)channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {//检查它的correlationId是否是我们所要找的那个if (properties.getCorrelationId().equals(corrId)) {//如果是,则响应BlockingQueueresponse.offer(new String(body, "UTF-8"));}}});System.out.println(" 客户端请求的结果:" + response.take());}
}

Server端

package com.example.rpc;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Server {public static void main(String[] args) {Connection connection = null;try {connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("rpc_queue", false, false, false, null);channel.basicQos(1);System.out.println("Awaiting RPC requests:");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String response = "";try {response = new String(body, "UTF-8");System.out.println("response (" + response + ")");} catch (RuntimeException e) {System.out.println("错误信息 " + e.toString());} finally {// 返回处理结果队列channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));// 确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。channel.basicAck(envelope.getDeliveryTag(), false);// RabbitMq consumer worker thread notifies the RPC// server owner threadsynchronized (this) {this.notify();}}}};// 取消自动确认boolean autoAck = false;channel.basicConsume("rpc_queue", autoAck, consumer);// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (consumer) {try {consumer.wait();} catch (InterruptedException e) {e.printStackTrace();}}}} catch (Exception e) {e.printStackTrace();} finally {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
Awaiting RPC requests:
response (Hello World!!!)
response (Hello World!!!)
response (Hello World!!!)# 客戶端发起3次请求
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!

7、发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使

用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将

队列绑定到默认的交换机 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/26311.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在Raspberry Pi 4上安装Ubuntu 20.04 + ROS noetic(不带显示器)

在Raspberry Pi 4上安装Ubuntu 20.04 ROS noetic&#xff08;不带显示器&#xff09; 1. 所需设备 所需设备&#xff1a; 树莓派 4 B 型 wifi microSD 卡&#xff1a;最小 32GB MicroSD 转 SD 适配器 &#xff08;可选&#xff09;显示器&#xff0c;鼠标等 2. 树莓派…

机器学习---概述(二)

文章目录 1.模型评估1.1 分类模型评估1.2 回归模型评估 2. 拟合2.1 欠拟合2.2 过拟合2.3 适当拟合总结&#xff1a; 3.深度学习3.1层次&#xff08;Layers&#xff09;&#xff1a;3.2 神经元&#xff08;Neurons&#xff09;&#xff1a;3.3 总结 1.模型评估 模型评估是机器学…

【Linux操作系统】Vim:提升你的编辑效率

Vim是一款功能强大的文本编辑器&#xff0c;它具有高度可定制性和灵活性&#xff0c;可以帮助程序员和文本编辑者提高编辑效率。本文将介绍Vim的基本使用方法、常用功能和一些实用技巧。 文章目录 1. Vim的基本使用方法&#xff1a;2. 常用功能&#xff1a;2.1 文件操作&#…

Qt应用开发(基础篇)——时间类 QDateTime、QDate、QTime

一、前言 时间类QDateTime、QDate、QTime、QTimeZone保存了Qt的时间、日期、时区信息&#xff0c;常用的时间类部件都会用到这些数据结构&#xff0c;常用概念有年、月、日、时、分、秒、毫秒和时区&#xff0c;时间和时区就关系到时间戳和UTC的概念。 UTC时间&#xff0c;又称…

Baumer工业相机堡盟工业相机如何通过BGAPI SDK获取相机当前数据吞吐量(C#)

Baumer工业相机堡盟工业相机如何通过BGAPISDK里函数来获取相机当前数据吞吐量&#xff08;C#&#xff09; Baumer工业相机Baumer工业相机的数据吞吐量的技术背景CameraExplorer如何查看相机吞吐量信息在BGAPI SDK里通过函数获取相机接口吞吐量 Baumer工业相机通过BGAPI SDK获取…

x光下危险物品/违禁物品目标识别的模型训练与推理代码

前言 1.安检在公共场合的重要性不言而喻&#xff0c;保障群众人身安全是其首要任务。在各种场合&#xff0c;安检都是不可或缺的环节。x光安检机作为安检的重要工具&#xff0c;尽管其具有人工监控判断成像的特性&#xff0c;但是其局限性也十分明显。 为了解决这一局限性为出…

React 核心开发者 Dan Abramov 宣布从 Meta 离职

导读React.js 核心开发者、Redux 作者 Dan Abramov 在社交平台发文宣布&#xff0c;将辞去在 Meta 的职务&#xff1a; “我感到苦乐参半&#xff0c;几周后我就要辞去 Meta 的工作了。在 Meta 的 React 组织工作是我的荣幸。感谢我过去和现在的同事接纳我&#xff0c;容忍我犯…

Java02-迭代器,数据结构,List,Set ,Map,Collections工具类

目录 什么是遍历&#xff1f; 一、Collection集合的遍历方式 1.迭代器遍历 方法 流程 案例 2. foreach&#xff08;增强for循环&#xff09;遍历 案例 3.Lamdba表达式遍历 案例 二、数据结构 数据结构介绍 常见数据结构 栈&#xff08;Stack&#xff09; 队列&a…

9.物联网操作系统之软件定时器

一。软件定时器概念及应用 1.软件定时器定义 就是软件实现定时器。 2.FreeRTOS软件定时器介绍 如上图所示&#xff0c;Times的左边为设置定时器时间&#xff0c;设置方式可以为任务设置或者中断设置&#xff1b;Times的右边为定时器的定时响应&#xff0c;使用CallBack响应。…

OLAP ModelKit Crack,ADO.NET和IList

OLAP ModelKit Crack,ADO.NET和IList OLAP ModelKit是一个多功能的.NET OLAP组件&#xff0c;用C#编写&#xff0c;只包含100%托管代码。它具有XP主题的外观&#xff0c;并能够使用任何.NET数据源(ADO.NET和IList)。借助任何第三方组件(尤其是图表组件)呈现数据的能力扩展了产品…

春秋云镜 CVE-2020-25540

春秋云镜 CVE-2020-25540 Thinkadmin v6任意文件读取漏洞 靶标介绍 ThinkAdmin 6版本存在路径查找漏洞&#xff0c;可利用该漏洞通过GET请求编码参数任意读取远程服务器上的文件。 启动场景 漏洞利用 1、未授权列目录poc 读取网站根目录Payload: http://think.admin/Think…

【LeetCode】105. 从前序与中序遍历序列构造二叉树 106. 从中序与后序遍历序列构造二叉树

105. 从前序与中序遍历序列构造二叉树 这道题也是经典的数据结构题了&#xff0c;有时候面试题也会遇到&#xff0c;已知前序与中序的遍历序列&#xff0c;由前序遍历我们可以知道第一个元素就是根节点&#xff0c;而中序遍历的特点就是根节点的左边全部为左子树&#xff0c;右…

4用opencv玩转图像2

opencv绘制文字和几何图形 黑色底图 显示是一张黑色图片 使用opencv画圆形 #画一个圆 cv2.circle(imgblack_img,center(400,400),radius100,color(0,0,255),thickness10) 画实心圆 只需要把thickness-1。 cv2.circle(imgblack_img,center(500,600),radius50,color(0,0,255),t…

K8s持久化存储(nfs网络存储)

数据卷 emptydir&#xff0c;是本地存储&#xff0c;pod重启&#xff0c;数据就不存在了&#xff0c;需要对数据持久化存储 1.nfs&#xff0c;网络存储 &#xff0c;pod重启&#xff0c;数据还存在的

【C语言学习——————预处理3000字讲解】

欢迎阅读新一期的c语言学习模块————预处理 ✒️个人主页&#xff1a;-_Joker_- &#x1f3f7;️专栏&#xff1a;C语言 &#x1f4dc;代码仓库&#xff1a;c_code &#x1f339;&#x1f339;欢迎大佬们的阅读和三连关注&#xff0c;顺着评论回访&#x1f339;&#x1f339…

SSM(Vue3+ElementPlus+Axios+SSM前后端分离)--功能实现[五]

文章目录 SSM--功能实现实现功能09-带条件查询分页显示列表需求分析/图解思路分析代码实现测试分页条件查询带条件分页查询显示效果 实现功能10-添加家居表单前端校验需求分析/图解思路分析代码实现完成测试测试页面效果 实现功能11-添加家居表单后端校验需求分析/图解思路分析…

Spring接口InitializingBean的作用和使用介绍

在Spring框架中&#xff0c;InitializingBean接口是一个回调接口&#xff0c;用于在Spring容器实例化Bean并设置Bean的属性之后&#xff0c;执行一些自定义的初始化逻辑。实现InitializingBean接口的Bean可以在初始化阶段进行一些必要的操作&#xff0c;比如数据的初始化、资源…

2023巅峰极客比赛web复现

<1> unserialize(反序列化字符串逃逸) 下载 www.zip得到源码&#xff1a; my.php 存在 pull_it恶意类 反序列化时会执行 $this-x 这里有一层过滤 $this-x不能为字母数字 可以取反、异或绕过 下面来找一找怎么去触发反序列化 index.php 会对我们登录框输入的参数先…

Markdown系列之Flowchat流程图

一.欢迎来到我的酒馆 介绍Markdown的Flowchart流程图语法。 目录 一.欢迎来到我的酒馆二.什么是Flowchart三.更进一步 二.什么是Flowchart 2.1 Flowchart是一款基于javascript的工具&#xff0c;使用它可以用代码创建简单的流程图。具体信息可以查看flowchart官网&#xff1a;…

【暑期每日一练】 Epilogue

目录 选择题&#xff08;1&#xff09;解析&#xff1a; &#xff08;2&#xff09;解析&#xff1a; &#xff08;3&#xff09;解析&#xff1a; &#xff08;4&#xff09;解析&#xff1a; &#xff08;5&#xff09;解析&#xff1a; 编程题题一描述输入描述&#xff1a;输…