做的视频发到哪个网站好/网站分析报告范文

做的视频发到哪个网站好,网站分析报告范文,有哪些做封面的网站,万网网站如何建设目录 工作模式 Simple(简单模式) Work Queue(工作队列) Publish/Subscribe(发布/订阅) Exchange(交换机)? Routing(路由模式) Topics(通配…

目录

工作模式

Simple(简单模式)

Work Queue(工作队列)

Publish/Subscribe(发布/订阅)

Exchange(交换机)?

Routing(路由模式)

Topics(通配符模式)

RPC(RPC通信)

Publisher Confirms(发布确认)

代码实现

Simple(简单模式)

生产者代码

消费者代码

Work Queues(工作队列)

生产者代码

消费者代码

Publish/Subscribe(发布/订阅)

生产者代码

消费者代码

Routing(路由模式)

生产者代码

消费者代码

Topics(通配符模式)

生产者代码

消费者代码

RPC(RPC通信)

客户端代码

服务端代码

Publisher Confirms(发布确认)

Publishing Messages Individually(单独确认)

Publishing Messages in Batches(批量确认)

Handling Publisher Confirms Asynchronously(异步确认)


RabbitMQ 共提供了 7 种工作模式进行消息传递:

在本篇文章中,我们就来学习 RabbitMQ 的工作模式,我们首先来了解这 7 种工作模式分别是怎样的

工作模式

Simple(简单模式)

P 表示生产者,是消息的发送方

C 表示消费者,是消息的接收者

Queue:表示消息队列,用于缓存消息,生产者生产的消息发送到队列中,消费者从队列中取出消息

简单模式下,只有一个生产者和一个消费者,生产者生产的消息存储到队列中后,都由这个消费者消费

特点:一个生产者 P,一个消费者 C,消息只能被消费一次,也称为 **点对点(Point-to-Point)**模式

适用场景:消息只能被单个消费者处理

在 RabbitMQ 入门中的入门代码的工作模式就是简单模式

Work Queue(工作队列)

此时有一个生产者和多个消费者

当生产者向队列中发送多条消息后,Work Queue 会将消息分配给不同的消费者,每个消费者接收到的消息不同,由多个消费者共同消费生产者生产的消息

例如:

由 A (生产者)发送不同消息,消息存储到 RabbitMQ 中,接着,由 B(消费者1) 和 C(消费者2) 共同消息A 发送的消息,此时,RabbitMQ 选择将第一条消息分配给 B,B 消费第一条消息,RabbitMQ 将第二条消息分配给 C,C 消费第二条消息…

B 和 C 接收到的消息是不同的,这两个消费者共同消费 A 发送的所有消息

特点:消息不会重复,分配给不同的消费者

适用场景:集群环境中实现异步处理

Publish/Subscribe(发布/订阅)

其中,X 表示的是交换机,在 发布/订阅 模式中,多了 Exchange 角色,因此,我们先来学习交换机相关知识

Exchange(交换机)

Exchange(交换机)的作用是接收生产者发送的消息,并将消息按照一定的规则路由到一个或多个队列中

生产者的消息都会先发送到交换机,然后再由交换机将消息路由到队列中

在前面 简单模式工作队列模式 下,图中都没有出现交换机,但实际上,生产者生产的消息都是先发送到交换机,然后再路由到队列中的。在前两种模式下,直接使用 RabbitMQ 提供的内置交换机就可以实现,因此,并没有突出交换机的存在,但实际上生产者生产的消息不会直接投递到队列中

在 RabbitMQ 中,交换机有 4 种类型:FanoutDirectTopicHeaders,不同的类型有着不同的路由策略

AMQP 协议中还有两种类型,System自定义,在这里,我们并不重点关注

Fanout:广播,将消息交给所有绑定到交换机的队列(Publish / Subscribe 模式

Direct:定向,将消息交给符合指定 routing key 的队列(Routing 模式

Topic:通配符,将消息交给符合 routing patterm(路由模式)的队列(Topics 模式

Headers:Headers 类型的交换机通过消息头部的属性来路由消息,而不依赖路由键的匹配规则来路由消息。根据发送的消息内容中的 headers 属性进行匹配,headers 类型的交换机性能会较差,因此也不太实用,基本上也不会进行使用

Exchange(交换机)只负责转发消息,并不具备存储消息的能力,因此,若是没有任何队列与 Exchange 绑定,或是没有符合路由规则的队列,消息就会丢失

接下来,我们来看 RoutingKeyBindingKey

RoutingKey:路由键,当生产者将消息发送给交换机时,会指定一个字符串,用于告诉交换机如何处理这个消息

BindingKey:绑定,RabbitMQ 中通过 Binding(绑定)将交换机与队列关联起来,在绑定时会指定一个 Binding Key,这样 RabbitMQ 就知道如何正确地将消息路由到队列

即,绑定时,需要的路由键是 BindingKey;发送消息时,需要的路由键是 RoutingKey

例如:

使用 BindingKey1 将交换机与 队列1 进行绑定,使用 BindingKey2 将交换机与 队列2 进行绑定

若在发送消息时,若设置 Routing Key 设置为 BindingKey1,交换机就会将消息路由到 队列1

即,当消息的 RoutingKey 与队列绑定的 BindingKey 相匹配时,消息才会被路由到这个队列中

其实,BindingKey 也属于路由键的一种,即,在绑定时使用的路由键,有时,也会使用 RoutingKey 表示 BindingKey,即使用 RoutingKey 表示 BindingKey 和 RoutingKey,因此,我们需要根据其使用场景进行区分

在了解了相关概念之后,我们继续看 Publish/ Subscribe 模式

上述有一个生产者 P,多个消费者 C1、C2,X 表示交换机,交换机将消息复制多份每个消费者接收到相同的消息

也就是说,生产者发送一条消息,经过交换机转发到不同的队列,不同的消费者从不同的队列中取出消息进行消费

特点:不同的消费者接收到的消息是相同的

适用场景:消息需要被多个消费者同时接收,如:实时通信或广播消息

Publish/Subscribe(发布/订阅)模式 与Work Queue(工作队列)模式 最大的区别就是:发布/订阅 模式下,不同消费者接收到的消息是相同的;而 工作队列 模式下,不同消费者接收到的消息是不同的

Routing(路由模式)

路由模式可以看做是 发布订阅模式 的变种,其在发布订阅模式的基础上,增加了路由 key

发布订阅模式会无条件的将所有消息发送给所有消费者,而路由模式下,交换机会根据 RoutingKey 的规则,将数据筛选后发送给对应的消费者队列

也就是说,只有满足条件的队列才会收到消息

如上图所示,Q1 通过 a 与交换机进行绑定,Q2 通过 a、b 和 c 与交换机进行绑定

当 P (生产者)在发送消息时,若设置 Routing Key 设置为 a,则此时 Q1 和 Q2 的BindingKey 都与其相匹配,消息就会被路由到 Q1 和 Q2 中

而当 P 发送消息时,设置Routing Key 设置为 b,此时,只有 Q2 的 BindingKey 与其相匹配,消息也就只会被路由到 Q2 中

适用场景:需要根据特定规则分发消息

Topics(通配符模式)

通配符模式,则是 路由模式 的变种,在 RoutingKey 的基础上,增加了通配符的功能,使得匹配更加灵活

Topics 和 Routing 的基本原理相同,即:生产者将消息发送给交换机,交换机根据 RoutingKey 将消息转发给与 RoutingKey 匹配的队列

而不同的是,Routing 模式下,需要RoutingKey 和 BingingKey 完全匹配;而 Topics 模式下,则是通配符匹配

在 BindingKey 中,存在两种特殊的字符串,用于模糊匹配

* :表示能够匹配任意一个单词

#:表示能够匹配任意多个单词(可以为 0 个)

Q1 通过 *.a.* 与交换机进行绑定,Q2 通过 *.*.b 和 c.# 与交换机进行绑定

当 P (生产者)在发送消息时,若设置 Routing Key 设置为 work.a.b,则此时 Q1 和 Q2 的BindingKey 都能够与其相匹配,消息就会被路由到 Q1 和 Q2 中

而当 P 发送消息时,设置Routing Key 设置为 a.a.a,此时,只有 Q1的 BindingKey 与其相匹配,消息也就只会被路由到 Q1中

适用场景:需要灵活匹配和过滤消息的场景

RPC(RPC通信)

RPC 通信过程中,没有生产者和消费者,而是通过两个队列实现了一个可回调的过程

例如:

客户端(Client)发送请求消息到指定队列(rpc_queue),并在消息属性中设置 reply_to 字段,这个字段指定了一个回调队列(amq.gen-Xa2…),这个回调队列用于接收服务端的响应消息

服务器(Server)从队列 rpc_queue 中取出请求消息,处理请求后,将响应消息发送到 reply_to 指定的回调队列(amq.gen-Xa2…)

客户端(Client)在回调队列上等待响应消息,一旦接收到响应,客户端就会检查消息的 correlation_id 属性,确保其是所期望的响应

简而言之,客户端将请求消息发送到 队列Q1 中,服务器从 Q1 中取出请求消息进行处理,然后将响应消息发送到 队列Q2 中,客户端从 Q2 中读取响应消息

从而实现了客户端向服务器发送请求,服务器返回对应的响应的功能

Publisher Confirms(发布确认)

Publisher Confirms 模式是 RabbitMQ 提供的一种确保消息可靠发送到 RabbitMQ 服务器的机制,在这种模式下,生产者可以等待 RabbitMQ 服务器的确认,以确保消息已经被服务器接收并处理

其过程为:

(1)生产者将 Channel 设置为 confirm 模式(通过调用 channel.confirmSelect() 完成),发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,以便追踪消息的状态

(2)当消息被 RabbitMQ 服务器接收并处理后,服务器会异步地向生产者发送一个确认(ACK),其中包含消息的唯一 ID,表示消息已经送达

通过 Publisher Confirms 模式,生产者可以确保消息被 RabbitMQ 服务器成功接收,从而避免消息丢失

**适用场景:**对数据安全性要求较高,如金融交易,订单处理等

在基本了解了 RabbitMQ 的 7 种工作模式后,我们就来通过代码简单实现一下这 7 种工作模式

代码实现

Simple(简单模式)

简单模式下,只有一个生产者和一个消费者,生产者生产的消息存储到队列后,都由这个消费者消费

在 RabbitMQ入门-CSDN博客中的入门代码的工作模式就是简单模式,因此,在这里就不再进行过多解释了

首先引入依赖:

        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
生产者代码
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost("49.232.238.62"); // ip 的默认值为 localhostfactory.setPort(5672); // 默认值为 5672factory.setVirtualHost("test01"); // 虚拟主机,默认值为 /// 账号factory.setUsername("admin"); // 用户名,默认为 guestfactory.setPassword("123456"); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare("simple.test", true, false, false, null);// 6. 通过 channel 发送消息到队列中String message = "test...";channel.basicPublish("", "simple.test", null, message.getBytes());System.out.println("消息:" + message + " 发送成功");// 7. 释放资源channel.close();connection.close();}
}
消费者代码
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost("49.232.238.62"); // ip 的默认值为 localhostfactory.setPort(5672); // 默认值为 5672factory.setVirtualHost("test01"); // 虚拟主机,默认值为 /// 账号factory.setUsername("admin"); // 用户名,默认为 guestfactory.setPassword("123456"); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare("simple.test", true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume("simple.test", true, consumer);// 7. 释放资源channel.close();connection.close();}
}

Work Queues(工作队列)

生产者代码

工作队列模式下,由一个生产者生产消息,多个消费者共同接收消息,消费者之间是竞争关系,每个消息只能被一个消费者接收

由于我们每次连接时都要使用 IP、端口号、虚拟主机名等,因此,我们可以将它们提取出来,放到 Constants 类中:

public class Constants {public static final String HOST = "49.232.238.62";public static final int PORT = 5672;public static final String VIRTUAL_HOST = "test01";public static final String USER_NAME = "admin";public static final String USER_PASSWORD = "123456";
}

声明 工作队列 模式下使用的队列:

    // 工作模式public static final String WORK_QUEUE = "work.queue";

接下来,我们就来实现生产者的代码:

工作队列模式与简单模式的区别在于工作模式下有多个消费者,因此生产者的消费代码与简单模式下差别不大,但在发送消息时,我们一次发送 20 条消息:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 6. 通过 channel 发送消息到队列中for (int i = 0; i < 20; i++) {String message = "work test... " + i;channel.basicPublish("", Constants.WORK_QUEUE, null, message.getBytes());}System.out.println("消息发送成功!");// 7. 释放资源channel.close();connection.close();}
}

运行代码,可以看到 work.queue 队列被创建,且存储了 20 条消息

接下来,我们继续编写消费者代码

消费者代码

消费者的代码也与简单模式下的代码差别不大,但在最后,我们并不进行资源的释放:

Consumer1:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);}
}

Consumer2:

public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);}
}

启动 Consumer1 和 Consumer2:

由于我们之前先启动了生产者,此时再启动消费者,由于消息较少,因此,先启动的 Consumer1 会瞬间将 20 条消息消费掉

因此,再次启动 Producer,观察结果:

可以看到两个消费者分别消费了 10 条消息

Publish/Subscribe(发布/订阅)

生产者代码

发布/订阅 模式中,多了 Exchange 角色

Exchange 常见有三种类型,分别代表不同的路由规则:

Fanout:广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)

Direct:定向,将消息交给符合指定 RoutingKey 的队列(Routing 模式)

Topics:通配符,将消息交给符合 Routing Pattern(路由模式)的队列(Topics 模式)

此时,在 发布/订阅 模式下,我们就需要声明交换机,并绑定队列和交换机

我们首先来看声明交换机

使用 channel.exchangeDeclare 方法来创建交换机,我们来看exchangeDeclare 方法:

Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;

参数:

exchange:交换机名称

type:交换机类型

durable:是否持久化,当为 true 时,会将交换机存盘,在服务器重启时不会丢失相关信息

autoDelete:是否自动删除,自动删除的前提是至少有一个队列或交换机与这个交换机绑定,之后与这个交换机绑定的队列或交换机都会与此解绑

internal:是否为内部使用,若设置为 true,则表示内部使用,客户端无法直接发送消息到这个交换机中,只能通过交换机路由到交换机这种方式

arguments:相关参数

其中,type 表示交换机类型,其类型为BuiltinExchangeType,也可以为 String

我们来看BuiltinExchangeType,它是一个枚举类型

DIRECT(“direct”):定向,直连,将消息交给符合指定 RoutingKey 的队列(Routing 模式)

FANOUT(“fanout”):扇形,广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)

TOPIC(“topic”):通配符,将消息交给符合 Routing Pattern(路由模式)的队列(Topics 模式)

**HEADERS(“headers”):**参数模式(较少使用)

返回值:

Exchange.DeclareOk:声明确认方法,用于指示已成功声明交换

Constants 类中定义 发布/订阅 模式下使用的交换机和两个队列:

    // 广播模式public static final String PUBLISH_CHANGE = "fanout";public static final String PUBLISH_QUEUE_1 = "publish.queue.1";public static final String PUBLISH_QUEUE_2 = "publish.queue.2";

建立连接,并声明交换机和两个队列:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.PUBLISH_CHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);}
}

交换机的类型为BuiltinExchangeType.FANOUT 广播模式

接着,我们使用 channel.queueBind 方法将队列和交换机进行绑定:

Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

queue:要绑定的队列名称

exchange:要绑定的交换机名称

routingKey:路由 key,路由规则

arguments:相关参数

在这里的 routingKey,其实就是 BindingKey将交换机与队列关联起来,从而让 RabbitMQ 知道如何正确地将消息路由到队列

在发布/订阅模式下,交换机类型为 fanoutroutingKey 设置为 “”,表示每个消费者都可以收到全部信息

        // 7. 绑定交换机和队列channel.queueBind(Constants.PUBLISH_QUEUE_1, Constants.PUBLISH_CHANGE, "", null);channel.queueBind(Constants.PUBLISH_QUEUE_2, Constants.PUBLISH_CHANGE, "", null);

接下来,就可以发送消息了:

        // 8. 发送消息for (int i = 0; i < 20; i++) {String message = "work test... " + i;channel.basicPublish(Constants.PUBLISH_CHANGE, "", null, message.getBytes());}System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();

完整代码:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.PUBLISH_CHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);// 7. 绑定交换机和队列channel.queueBind(Constants.PUBLISH_QUEUE_1, Constants.PUBLISH_CHANGE, "", null);channel.queueBind(Constants.PUBLISH_QUEUE_2, Constants.PUBLISH_CHANGE, "", null);// 8. 发送消息for (int i = 0; i < 20; i++) {String message = "work test... " + i;channel.basicPublish(Constants.PUBLISH_CHANGE, "", null, message.getBytes());}System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();}
}

运行代码,并观察结果:

可以看到,publish.queue.1 和 publish.queue.2 中都已经存储了 20 条消息

查看 fanout 的绑定关系:

成功绑定publish.queue.1 和 publish.queue.2:

接下来,我们继续编写消费者代码

消费者代码

交换机和队列的绑定关系已经在生产者中实现了,因此,消费者代码中可以不必再写

其实现与 工作队列模式 下是基本相同的,只需要修改读取的队列即可

Consumer1:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.PUBLISH_QUEUE_1, true, consumer);}
}

Consumer2:

public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.PUBLISH_QUEUE_2, true, consumer);}
}

运行 Consumer1 和 Consumer2:

Consumer1 和 Consumer2 都接收到了这 20 条消息

Routing(路由模式)

生产者代码

Routing 模式下,队列与交换机之间的绑定,不再是任意的绑定了,而是需要指定一个 BindingKey

生产者在向 交换机 发送消息时,也需要指定消息的 RoutingKey

交换机不会将消息发送给每一个绑定的 key,而是会根据消息的 RoutingKey 进行判断,只有队列绑定时的 BindingKey 和发送消息的 RoutingKey 完全一致时,才会接收消息

先在 Constants 类中定义 路由模式下使用的交换机和队列:

    // 路由模式public static final String ROUTING_CHANGE = "routing";public static final String ROUTINT_QUEUE_1 = "routing.queue.1";public static final String ROUTINT_QUEUE_2 = "routing.queue.2";

路由模式下,生产者的代码与 发布/订阅模式 下的区别在于:交换机的类型不同 以及 绑定队列的 BindKey 不同

(1)交换机类型不同

在声明交换机时,交换机的类型为BuiltinExchangeType.DIRECT

        // 5. 声明交换机channel.exchangeDeclare(Constants.ROUTING_CHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);

(2)声明队列

        // 6. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);

(3)交换机与队列的绑定方式不同

        // 7. 绑定交换机和队列channel.queueBind(Constants.ROUTINT_QUEUE_1, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "b", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "c", null);

此时,我们就可以发送消息了

在发送消息时,需要指定 RoutingKey

        // 8. 发送消息String messageA = "test a...";channel.basicPublish(Constants.ROUTING_CHANGE, "a", null, messageA.getBytes());String messageB = "test b...";channel.basicPublish(Constants.ROUTING_CHANGE, "b", null, messageB.getBytes());String messageC = "test c... ";channel.basicPublish(Constants.ROUTING_CHANGE, "c", null, messageB.getBytes());System.out.println("消息发送成功!");

完整代码:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.ROUTING_CHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);// 7. 绑定交换机和队列channel.queueBind(Constants.ROUTINT_QUEUE_1, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "a", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "b", null);channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "c", null);// 8. 发送消息String messageA = "test a...";channel.basicPublish(Constants.ROUTING_CHANGE, "a", null, messageA.getBytes());String messageB = "test b...";channel.basicPublish(Constants.ROUTING_CHANGE, "b", null, messageB.getBytes());String messageC = "test c... ";channel.basicPublish(Constants.ROUTING_CHANGE, "c", null, messageB.getBytes());System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();}
}

运行代码,并观察结果:

routing.queue.1 中有 1 条消息,routing.queue.2 中有两条消息

查看 routing 交换机与 队列的绑定关系:

接下来,我们继续编写消费者的代码

消费者代码

消费者代码与 发布/订阅 模式下基本相同,只需要修改队列名称即可:

Consumer1:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.ROUTINT_QUEUE_1, true, consumer);}
}

Consumer2:

public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer2 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.ROUTINT_QUEUE_2, true, consumer);}
}

运行结果:

Topics(通配符模式)

生产者代码

相比于 routing 模式,topics 类型的交换机在匹配规则上进行了扩展,BindingKey 支持通配符匹配

其中,RoutingKey 是一系列由 . 分割的单词,如 user.name、work.abc等

BindingKey 也和 RoutingKey 一样,由 . 分割的字符串

在 BindingKey 中可以存在两种特殊的字符串,用于模糊匹配:

* :表示能够匹配任意一个单词

#:表示能够匹配任意多个单词(可以为 0 个)

例如:

交换机 与 队列1(Q1)的 BindingKey 为 *.a.*

交换机 与 队列2(Q2)的 BindingKey 为 *.*.b

交换机 与 队列2(Q2)的 BindingKey 为 c.#

则:

若生产者的 RoutingKey 为 work.a.b,则消息会被路由到 Q1 和 Q2

若生产者的 RoutingKey 为 a.a.a,则消息会被路由到 Q1

若生产者的 RoutingKey 为 c.work.a,则消息会被路由到 Q2

若生产者的 RoutingKey 为 b.c.g,则消息会被丢弃,或是返回给生产者(需要设置 mandatory 参数)

接下来,我们就来实现 通配符模式下 的生产者:

先在 Constants 类中定义通配符模式下使用的交换机和队列:

    // 通配符模式public static final String TOPICS_CHANGE = "topics";public static final String TOPICS_QUEUE_1 = "topics.queue.1";public static final String TOPICS_QUEUE_2 = "topics.queue.2";

与 路由模式相比,发布订阅模式与其区别为:交换机类型不同 以及 绑定队列的 RoutingKey 不同

(1)交换机类型不同

交换机的类型为BuiltinExchangeType.TOPIC

        // 5. 声明交换机channel.exchangeDeclare(Constants.TOPICS_CHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);

(2)声明队列

        // 6. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);

(3)交换机与队列的绑定方式不同

        // 7. 绑定交换机和队列channel.queueBind(Constants.TOPICS_QUEUE_1, Constants.TOPICS_CHANGE, "*.a.*", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "*.*.b", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "c.#", null);

此时,我们就可以发送消息了

在发送消息时,需要指定 RoutingKey

        // 8. 发送消息String message1 = "test work.a.b";channel.basicPublish(Constants.TOPICS_CHANGE, "work.a.b", null, message1.getBytes());String message2 = "test a.a.a";channel.basicPublish(Constants.TOPICS_CHANGE, "a.a.a", null, message2.getBytes());String message3 = "test c.work.a";channel.basicPublish(Constants.TOPICS_CHANGE, "c.work.a", null, message3.getBytes());System.out.println("消息发送成功!");

完整代码:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(Constants.TOPICS_CHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);// 6. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);// 7. 绑定交换机和队列channel.queueBind(Constants.TOPICS_QUEUE_1, Constants.TOPICS_CHANGE, "*.a.*", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "*.*.b", null);channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "c.#", null);// 8. 发送消息String message1 = "test work.a.b";channel.basicPublish(Constants.TOPICS_CHANGE, "work.a.b", null, message1.getBytes());String message2 = "test a.a.a";channel.basicPublish(Constants.TOPICS_CHANGE, "a.a.a", null, message2.getBytes());String message3 = "test c.work.a";channel.basicPublish(Constants.TOPICS_CHANGE, "c.work.a", null, message3.getBytes());System.out.println("消息发送成功!");// 9. 释放资源channel.close();connection.close();}
}

运行并观察结果:

topics.queue.1 和 topics.queue.2 中都已经存储了两条消息

我们来看topics.queue.1 中的消息:

我们继续实现消费者代码

消费者代码

Topics 模式的消费者代码与 Routing 模式下相同,只需要修改消费的队列名称即可:

Consumer1:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPICS_QUEUE_1, true, consumer);}
}

Consumer2:

public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);// 6. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 回调方法,当接收到消息后,自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 成功接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPICS_QUEUE_2, true, consumer);}
}

运行 Consumer1 和 Consumer2,并观察结果:

RPC(RPC通信)

RPC(Remote Procedure Call),远程过程调用,是一种通过网络从远程计算机上请求服务,不需要了解底层网络的技术,类似于 Http 远程调用

RabbitMQ 实现 RPC 通信,是通过两个队列实现一个可回调的过程:

其过程为:

客户端(Client)发送请求消息到指定队列(rpc_queue),并在消息属性中设置 reply_to 字段,这个字段指定了一个回调队列(amq.gen-Xa2…),这个回调队列用于接收服务端的响应消息

服务器(Server)从队列 rpc_queue 中取出请求消息,处理请求后,将响应消息发送到 reply_to 指定的回调队列(amq.gen-Xa2…)

客户端(Client)在回调队列上等待响应消息,一旦接收到响应,客户端就会检查消息的 correlation_id 属性,确保其是否是所期望的响应

接下来,我们就来实现 RPC 的客户端:

客户端代码

客户端主要实现的功能有:

1. 发送请求消息到队列中

2. 从回调队列中读取响应消息

我们先来看发送请求消息到队列的过程:

(1)声明两个队列:消息发送到的队列(Queue) 和 回调队列(replayQueue),并声明本次请求的唯一标志 corrId

(2)将 replayQueue 和 corrId 配置到 Queue 中

接下来,需要从回调队列中读取响应消息,若我们直接从回调队列中读取响应消息,此时,可能服务端还没有处理完请求,也就未将响应消息发送到回调队列中,就读取不到响应

因此,我们可以使用阻塞队列来监听回调队列中的消息

1. 使用阻塞队列阻塞当前进程,监听回调队列中的消息,回调队列中有消息时,将响应消息放到阻塞队列中

2. 阻塞队列中有消息后,主线程被唤醒,处理返回内容

先在 Constants 类中声明 RPC 模式下使用的两个队列:

    // RPC 模式public static final String RPC_QUEUE_1 = "rpc.queue1";public static final String RPC_QUEUE_2 = "rpc.queue2";

在这里,我们就不再声明交换机了,直接使用默认的交换机

声明 消息发送的队列 和 回调队列:

        // 声明队列channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);

使用 UUID 生成本次请求的唯一标志,并配置消息属性:

        // 本次请求的唯一标识String corrId = UUID.randomUUID().toString();

消息相关配置的类型为 BasicProperties,位于 com.rabbitmq.client.AMQP 下:

AMQP.BasicProperties 提供了一个构造器,可以通过builder() 来设置一些属性:

使用correlationId 方法设置唯一标识,replyTo 方法设置回调队列:

        // 本次请求的唯一标识String corrId = UUID.randomUUID().toString();// 消息属性AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(corrId) // 唯一ID.replyTo(Constants.RPC_QUEUE_2) // 回调队列.build();

最后调用 build 方法创建实例

使用内置交换机发送消息:

        // 7. 发送消息String message = "test rpc...";channel.basicPublish("", Constants.RPC_QUEUE_1, basicProperties, message.getBytes());

接着,使用阻塞队列存储回调结果:

        // 阻塞队列,存放回调结果,一次获取一条消息BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);

从回调队列中接收响应消息:

        // 8. 接收服务器的响应DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回调消息: " + new String(body));// 判断标识是否正确if(corrId.equals(properties.getCorrelationId())) {queue.offer(new String(body, "UTF-8"));}}};channel.basicConsume(Constants.RPC_QUEUE_2, true, consumer);

最后,从阻塞队列中获取响应消息:

        // 9. 获取响应消息String result = queue.take();System.out.println("result: " + result);

完整代码:

public class Client {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 使用默认的交换机// 5. 声明队列channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);// 6. 设置消息属性// 本次请求的唯一标识String corrId = UUID.randomUUID().toString();// 消息属性AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(corrId) // 唯一ID.replyTo(Constants.RPC_QUEUE_2) // 回调队列.build();// 7. 发送消息String message = "test rpc...";channel.basicPublish("", Constants.RPC_QUEUE_1, basicProperties, message.getBytes());// 阻塞队列,存放回调结果,一次获取一条消息BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);// 8. 接收服务器的响应DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回调消息: " + new String(body));// 判断标识是否正确if(corrId.equals(properties.getCorrelationId())) {queue.offer(new String(body, "UTF-8"));}}};channel.basicConsume(Constants.RPC_QUEUE_2, true, consumer);// 9. 获取响应消息String result = queue.take();System.out.println("result: " + result);}
}

我们继续编写服务端代码

服务端代码

服务端要实现的功能为:

1. 从队列中接收请求消息

2. 根据消息内容处理请求消息,并将响应消息返回到回调队列中

我们先来实现接收消息:

建立连接、声明队列等过程都与 客户端代码相同

但需要注意的是,我们需要设置服务端同时最多只能获取一条消息

        // 6. 设置同时最多只能获取一条消息channel.basicQos(1);

若不设置 basicQos,RabbitMQ 会使用默认的 Qos 设置,其 prefetchCount 默认值为 0,当prefetchCount 为 0 时,RabbitMQ 会根据内部实现和当前网络状况等因素,可能同时发送多条消息给消费者。这也就意味着,在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有波动

而在 RPC 模式下,通常希望是一对一的消息处理,即,一个请求对应一个响应。服务端在处理完一个消息并确认后,才会接收到下一条消息

接收消息后,就可以对请求消息进行处理并返回响应结果了:

        // 7. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 将消息发到队列2中AMQP.BasicProperties replayProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();// 返回String message = new String(body);System.out.println("接收到消息: " + message);// 响应消息String response = "request: " + message + " 接收成功";channel.basicPublish("", properties.getReplyTo(), replayProperties, response.getBytes());// 对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_QUEUE_1, false, consumer);

需要注意的是,在这里我们需要手动对消息进行应答,而不是自动确认:

在 RabbitMQ 中,basicConsume 方法的 autoAck 参数用于指定消费者是否会自动向消息队列确认消息

当设置为 true 时,消息队列会在将消息发送给消费者后,认为消息已经被成功消费,立即删除该条消息,这也就意味着,若消费者处理消息失败,消息就会丢失

当设置为 false 时,消息队列在将消息发送给消费者后,需要消费者显示地调用 basicAck 方式来确认消息,手动确认提供了更高的可靠性,保证消息不会被意外丢失,适用于消息处理重要且需要确保每个消息被正确处理的场景

完整代码:

public class Service {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理// 4. 创建 ChannelChannel channel = connection.createChannel();// 使用默认的交换机// 5. 声明队列channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);// 6. 设置同时最多只能获取一条消息channel.basicQos(1);// 7. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 将消息发到队列2中AMQP.BasicProperties replayProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();// 返回String message = new String(body);System.out.println("接收到消息: " + message);// 响应消息String response = "request: " + message + " 接收成功";channel.basicPublish("", properties.getReplyTo(), replayProperties, response.getBytes());// 对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_QUEUE_1, false, consumer);}
}

运行代码,观察结果:

Publisher Confirms(发布确认)

消息中间件,都会面临消息丢失的问题

消息丢失大概分为三种情况:

1. 生产者的问题:由于应用程序故障、网络抖动等各种原因,生产者没有成功向 broker 发送消息

2. 消息中间件的问题:生产者成功将消息发送给了 broker,但 broker 未能将消息保存好,导致消息丢失

3. 消费者的问题:broker 将消息发送给了消费者,消费者在消费消息时,未处理好,导致 broker 将消费失败的消息从队列中删除了

Rabbit 针对上述问题给出了相应的解决方案:

针对问题1,可以采用**发布确认(Publisher Confirms)**机制实现

针对问题2,可以通过持久化机制

针对问题3,可以采用消息应答机制

接下来,我们就来进一步学习 发布确认机制

发布确认的过程:

(1)生产者将 Channel 设置为 confirm 模式(通过调用 channel.confirmSelect() 完成),发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,以便追踪消息的状态

(2)当消息被 RabbitMQ 服务器接收并处理后,服务器会向生产者发送一个确认(ACK),其中包含消息的唯一 ID,表示消息已经送达

其中,deliveryTag 包含了确认消息的序号,此外,broker 也可以设置 channel.basicAck 方法中的 multiple 参数,表示这个序号之前的所有消息都已经被处理

发送确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息

当消息最终得到确认之后,生产者可以通过回调方法来处理该确认消息

若 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 **nack(Basic.Nack)**命令,生产者同样可以在回调方法中处理该 nack 命令

使用发布确认机制,需要将信道设置为 **confirm(确认)**模式:

            // 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();

发布模式有 3 种确认策略,我们分别来进行学习

由于使用每种策略时都需要建立连接,因此,我们将建立连接抽取出来:

    public static Connection createConnection() throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理return connection;}
Publishing Messages Individually(单独确认)

单独确认模式下,每发送一条消息,RabbitMQ 会在消息被成功持久化到队列或者被消费者成功接收后,发回一个确认(acknowledgment)。生产者可以收到关于每条消息的确认信息

也就是说,生产者发送消息后会等待每条消息的确认信号。如果消息发送成功,RabbitMQ 会返回一个确认信号;如果消息失败,RabbitMQ 会返回一个负确认信号(nack)

我们先在 Constans 类中声明会使用的队列:

    // 发布确认模式public static final String PUBLISH_CONFIRMS_QUEUE_1 = "publish.confims.queue1";public static final String PUBLISH_CONFIRMS_QUEUE_2 = "publish.confims.queue2";public static final String PUBLISH_CONFIRMS_QUEUE_3 = "publish.confims.queue3";

我们仍使用默认的交换机进行路由

每次都发送 200 条消息:

public class Producer {public static int MESSAGE_COUNT = 200;
}

每当发送一条消息,就使用 channel.waitForConfirms() 方法等待确认消息

void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

等待确认消息,当消息被确认,方法就会返回,若消息超时,就会抛出TimeoutException 异常,若消息丢失,就会抛出IOException

此外,我们记录 单独确认策略 发送消息的耗时:

public class Producer {public static int MESSAGE_COUNT = 200;public static int WAIT_TIME = 5000;public static void publishMessageIndividually() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());// 等待确认channel.waitForConfirms(WAIT_TIME);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages individually in %d ms
", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}
}

完整代码:

public class Producer {public static int MESSAGE_COUNT = 200;public static int WAIT_TIME = 5000;// 建立连接public static Connection createConnection() throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理return connection;}// 单独确认模式public static void publishMessageIndividually() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());// 等待确认channel.waitForConfirmsOrDie(WAIT_TIME);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages individually in %d ms
", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}public static void main(String[] args) {// 单独确认模式publishMessageIndividually();}
}

运行结果:

可以看到,发送 200 条消息的耗时较长

且,单独确认策略是每发送一条消息后,就调用channel.waitForConfirmsOrDie 方法,等待服务端的确认,也就是一种串行同步等待的方式,尤其是对于持久化的消息而言,需要等待消息确认存储在磁盘之后才会返回

但发布确认机制支持异步确认,即,可以一边发送消息,一边等待消息确认

我们接着看另外两种策略

Publishing Messages in Batches(批量确认)

批量确认会在每发送一批消息后,调用waitForConfirms 方法,等待服务器的确认返回

我们每发送 50 条消息,就调用waitForConfirms 方法进行确认:

    public static int BATCH_SIZE = 50;// 批量确认模式public static void publishMessageInBatches() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置为 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_2, true, false, false, null);// 发送消息int messageCount = 0;long startTime = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_2, null, message.getBytes());messageCount++;// 批量确认if(messageCount == BATCH_SIZE) {channel.waitForConfirms(WAIT_TIME);messageCount = 0;}}// 消息发送完,若还有未确认消息,则进行最后的确认if (messageCount > 0) {channel.waitForConfirms(WAIT_TIME);}long endTime = System.currentTimeMillis();System.out.printf("publish %d messages in batch in %d ms
", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}

需要注意的是,若我们发送的消息为 210 条,此时最后的十条消息未被确认,因此,我们在消息发送完成后,进行最后的确认

调用publishMessageInBatches 方法,并观察结果:

我们可以看到,相比于单独确认策略,批量确认极大的提高了 confirm 的效率,但当出现了 Basic.Nack 或超时时,我们无法确定是哪一条消息出现了问题,客户端需要将这一批消息都进行重发,这也就重复发送了很多消息,当消息经常丢失时,批量确认的性能会不升反降

最后,我们来看 异步确认

Handling Publisher Confirms Asynchronously(异步确认)

异步确认提供了一个回调方法,服务端确认了一条或多条消息后,客户端会调用这个方法进行处理

Channel 接口提供了 addConfirmListener 方法,可以添加ConfirmListener 回调接口

ConfirmListener 接口中包含两个重要方法:

handleAckhandleNack,分别对应处理 RabbitMQ 发送给生产者的 ack 和 nack

deliveryTag 表示发送消息的序号

multiple 表示是否批量确认,开启批量确认后,若 RabbitMQ 返回的消息序号为 20,则表明 20 条消息都已经接收成功;当不开启批量确认时,若 RabbitMQ 返回的消息序号为 20 ,则表明 20 条消息被成功接收

在使用异步确认策略时,我们需要为每个 Channel 维护一个已发送消息的序号集合,当收到 RabbitMQ 的 confirm 回调时,从集合中删除掉对应消息

Channel 开启 confirm 模式后,channel 上发送消息都会附带一个从 1 开始递增的 deliveryTag 序号。我们可以使用 SortedSet 的有序性来维护这个已发送消息的集合

实现步骤:

1. 使用有序集合存储未确认的消息序号

            // 使用有序集合来存储未确认的消息SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());

2. 当收到 ack 时,从集合中删除消息序号,若为批量确认,则删除小于等于当前消息序号的所有序号

            // 进行确认channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}});

3. 当接收到 nack 时,需要根据具体情况进行消息重发等操作

在这里,我们就不对其进行处理了,直接将消息清除:

                @Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 处理失败,消息重发...// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}

接着,我们发送消息,每当发送一条消息,就将其序号存储到有序集合中:

            // 发送消息long startTime = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;// 获取序号long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());// 存储序号confirmSet.add(nextPublishSeqNo);}

当有序集合为空时,消息确认完,因此,我们使用 while 循环等待消息确认完毕:

            // 消息确认完毕while (!confirmSet.isEmpty()) {Thread.sleep(10);}

若循环体中什么也不写,while 循环执行的速度会非常快,因此,每当判断一次,我们让其等待 10 ms

完整代码:

    // 异步确认模式public static void handlePublishConfirmAsynchronously() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_3, false, false, true, null);// 使用有序集合来存储未确认的消息SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());// 进行确认channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 处理失败,消息重发...// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}});// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;// 获取序号long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());// 存储序号confirmSet.add(nextPublishSeqNo);}// 消息确认完毕while (!confirmSet.isEmpty()) {Thread.sleep(10);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages and handled confirms asynchronously in %d ms
", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}

运行结果:

可以看到,三种策略中,异步确认的表现更好

完整代码:

public class Producer {public static int MESSAGE_COUNT = 200;public static int WAIT_TIME = 5000;public static int BATCH_SIZE = 50;// 建立连接public static Connection createConnection() throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost(Constants.HOST); // ip 的默认值为 localhostfactory.setPort(Constants.PORT); // 默认值为 5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /// 账号factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guestfactory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest// 3. 创建连接 ConnectionConnection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理return connection;}// 单独确认模式public static void publishMessageIndividually() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 开启信道确认模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());// 等待确认channel.waitForConfirmsOrDie(WAIT_TIME);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages individually in %d ms
", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}// 批量确认模式public static void publishMessageInBatches() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置为 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_2, true, false, false, null);// 发送消息int messageCount = 0;long startTime = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_2, null, message.getBytes());messageCount++;// 批量确认if(messageCount == BATCH_SIZE) {channel.waitForConfirms(WAIT_TIME);messageCount = 0;}}// 消息发送完,若还有未确认消息,则进行最后的确认if (messageCount > 0) {channel.waitForConfirms(WAIT_TIME);}long endTime = System.currentTimeMillis();System.out.printf("publish %d messages in batch in %d ms
", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}// 异步确认模式public static void handlePublishConfirmAsynchronously() {try (Connection connection = createConnection()){// 开启信道Channel channel = connection.createChannel();// 设置 confirm 模式channel.confirmSelect();// 声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_3, false, false, true, null);// 使用有序集合来存储未确认的消息SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());// 进行确认channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 处理失败,消息重发...// 若为批量确认,则删除确认序号前所有元素if (multiple) {confirmSet.headSet(deliveryTag + 1).clear();} else {confirmSet.remove(deliveryTag);}}});// 记录开始时间long startTime = System.currentTimeMillis();// 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "message " + i;// 获取序号long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());// 存储序号confirmSet.add(nextPublishSeqNo);}// 消息确认完毕while (!confirmSet.isEmpty()) {Thread.sleep(10);}// 记录结束时间long endTime = System.currentTimeMillis();System.out.printf("publish %d messages and handled confirms asynchronously in %d ms
", MESSAGE_COUNT, endTime - startTime);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}public static void main(String[] args) {// 单独确认模式publishMessageIndividually();// 批量确认模式publishMessageInBatches();// 异步确认handlePublishConfirmAsynchronously();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/70880.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

24.[前端开发-JavaScript基础]Day01-插件配置-变量-数据

一、邂逅JavaScript 1 认识编程语言 前端的三大核心 计算机语言 编程语言 常见的编程语言 2 编程语言发展历史 编程语言的发展历史 – 机器语言 编程语言的发展历史 – 汇编语言 编程语言的发展历史 – 高级语言 机器语言和高级语言 3 JavaScript的历史 认识JavaScript J…

uni-app集成sqlite

Sqlite SQLite 是一种轻量级的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;广泛应用于各种应用程序中&#xff0c;特别是那些需要嵌入式数据库解决方案的场景。它不需要单独的服务器进程或系统配置&#xff0c;所有数据都存储在一个单一的普通磁盘文件中&am…

多模态人物视频驱动技术回顾与业务应用

一种新的商品表现形态&#xff0c;内容几乎存在于手淘用户动线全流程&#xff0c;例如信息流种草内容、搜索消费决策内容、详情页种草内容等。通过低成本、高时效的AIGC内容生成能力&#xff0c;能够从供给端缓解内容生产成本高的问题&#xff0c;通过源源不断的低成本供给倒推…

navicat161_premium_cs_x64 安装与使用

navicat161_premium_cs_x64 安装与使用https://mp.weixin.qq.com/s/eE90x59hDVdk-shXSvICbA

mmdetection框架下使用yolov3训练Seaships数据集

之前复现的yolov3算法采用的是传统的coco数据集&#xff0c;这里我需要在新的数据集上跑&#xff0c;也就是船舶检测方向的SeaShips数据集&#xff0c;这里给出教程。 Seaships论文链接&#xff1a;https://ieeexplore.ieee.org/stamp/stamp.jsp?tp&arnumber8438999 一、…

【多模态处理篇三】【DeepSeek语音合成:TTS音色克隆技术揭秘】

最近帮某明星工作室做AI语音助手时遇到魔幻需求——要求用5秒的咳嗽声克隆出完整音色!传统TTS系统直接翻车,生成的语音像得了重感冒的电音怪物。直到祭出DeepSeek的TTS音色克隆黑科技,才让AI语音从"机器朗读"进化到"声临其境"。今天我们就来扒开这个声音…

基于YOLO11深度学习的苹果叶片病害检测识别系统【python源码+Pyqt5界面+数据集+训练代码】

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…

一篇docker从入门到精通

Docker Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中&#xff0c;然后发布到任何流行的 Linux 机器上&#xff0c;也可以实现虚拟化。容器是完全使用沙盒机制&#xff0c;相互之间不会有任何接口&#xff08;类似 iP…

w803|联盛德|WM IoT SDK2.X测试|window11|TOML 文件|外设|TFT_LCD|测试任务|(5):TFT_LCD_LVGL示例

TFT_LCD_LVGL 功能概述 此应用程序是使用 WM IoT SDK 进行 LVGL 功能的示例。它演示了如何初始化 TFT LCD 设备&#xff0c;并创建 LVGL DEMO Task 进行 LVGL 模块的初始化&#xff0c;并展示 LVGL 原生的不同 Demo 场景, 例如&#xff1a; Widgets, Music Player, Benchmark…

Oracle Redo日志损坏挽救详细攻略

一 介绍 1.1 介绍 Oracle Redo损坏分四种情况:unused状态日志损坏 inactive状态日志损坏 active状态日志损坏 current状态日志损坏。针对不同状态的日志损坏&#xff0c;处理方式有所不同&#xff0c;下面将逐一介绍。 二 恢复 2.1 unused与inactive状态日志损坏 如果这个…

将VsCode变得顺手好用(1

目录 设置中文 配置调试功能 提效和增强相关插件 主题和图标相关插件 创建js文件 设置中文 打开【拓展】 输入【Chinese】 下载完成后重启Vs即可变为中文 配置调试功能 在随便一个位置新建一个文件夹&#xff0c;用于放置调试文件以及你未来写的代码&#xff0c;随便命名但…

1.1部署es:9200

安装es&#xff1a;root用户&#xff1a; 1.布署java环境 - 所有节点 wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm yum localinstall jdk-8u341-linux-x64.rpm -y java -version 2.下载安装elasticsearch - 所有节点 wget ftp://10.3.148.254/Note/Elk/…

java后端开发day20--面向对象进阶(一)--static继承

&#xff08;以下内容全部来自上述课程&#xff09; 1.static–静态–共享 static表示静态&#xff0c;是java中的一个修饰符&#xff0c;可以修饰成员方法&#xff0c;成员变量。 1.静态变量 被static修饰的成员变量&#xff0c;叫做静态变量。 特点&#xff1a; 被该类…

DeepSeek本地部署+自主开发对话Web应用

文章目录 引言前端部分核心页面DeepSeek.vueMyModal.vue 后端部分WebSocketConfig 配置类AbstractDeepSeekToolDeepSeekWebSocketHandler 数据库设计总结 引言 最近DeepSeep横空出世&#xff0c;在全球内掀起一股热潮&#xff0c;到处都是满血大模型接入的应用&#xff0c;但这…

使用DeepSeek/chatgpt等AI工具辅助网络协议流量数据包分析

随着deepseek,chatgpt等大模型的能力越来越强大&#xff0c;本文将介绍一下deepseek等LLM在分数流量数据包这方面的能力。为需要借助LLM等大模型辅助分析流量数据包的同学提供参考&#xff0c;也了解一下目前是否有必要继续学习wireshark工具以及复杂的协议知识。 pcap格式 目…

DeepSeek-R1:通过强化学习激发大语言模型的推理能力

注&#xff1a;此文章内容均节选自充电了么创始人&#xff0c;CEO兼CTO陈敬雷老师的新书《自然语言处理原理与实战》&#xff08;人工智能科学与技术丛书&#xff09;【陈敬雷编著】【清华大学出版社】 文章目录 DeepSeek大模型技术系列三DeepSeek大模型技术系列三》DeepSeek-…

基于YOLO11深度学习的医学X光骨折检测与语音提示系统【python源码+Pyqt5界面+数据集+训练代码】

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…

数据结构:二叉树的数组结构以及堆的实现详解

目录 一.树与二叉树 1.树的概念与相关术语&#xff1a; 2.二叉树&#xff1a; &#xff08;1&#xff09;定义&#xff1a; &#xff08;2&#xff09;特殊的二叉树&#xff1a; &#xff08;3&#xff09;完全二叉树 &#xff08;4&#xff09;二叉树的存储结构&#x…

SQL笔记#复杂查询

一、视图 1、视图和表 使用试图时会执行SELECT语句并创建一张临时表。视图中保存的是SELECT语句;表中保存的是实际数据。 2、创建视图的方法 CREATE VIEW 视图名称(<视图列名1>,<视图列名2>,……) AS <SELECT语句> CREATE VIEW ProductSum (prod…

深度求索(DeepSeek)的AI革命:NLP、CV与智能应用的技术跃迁

Deepseek官网&#xff1a;DeepSeek 引言&#xff1a;AI技术浪潮中的深度求索 近年来&#xff0c;人工智能技术以指数级速度重塑全球产业格局。在这场技术革命中&#xff0c;深度求索&#xff08;DeepSeek&#xff09;凭借其前沿的算法研究、高效的工程化能力以及对垂直场景的…