RabbitMQ消息可靠性保证机制4--消费端限流

7.7 消费端限流

在类似如秒杀活动中,一开始会有大量并发写请求到达服务端,城机对消息进行削峰处理,如何做?

当消息投递的速度远快于消费的速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应可能会引起系统大范围的宕机,这就很悲剧了

7.7.1 资源限制限流

在RabbitMQ中可对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接的客户端的套接字读取数据。连接心中监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以或者被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止将收到通知。

/etc/rabbitmq/rabbitmq.conf中配置磁盘可用空间大小:

# 磁盘限制阈值设置
# 设置磁盘的可用空间大小,单位字节。当磁盘可用空间低于这个值的时候,发生磁盘告警,触发限流。
# 如果设置了相对大小,则忽略此绝对大小。
disk_free_limit.absolute = 2000# 使用计量单位,从RabbitMQ 3.6.0开始有效,对vm_memory_high_watemark同样有效。
# disk_free_limit.absolute = 500KB
# disk_free_limit.absolute = 50mb
# disk_free_limit.absolute = 5GB# Alternatively, we can set a limit relative to total avaliable RAM.
# Values lower than 1.0 can be dangerous and should be used carefully.
# 还可以使用相对于总可用内存来设置。注意,此值不要低于1.0!
# 当磁盘可用空间低于总可用内存的2.0倍的时候,触发限流
# disk_free_limit.relative = 2.0# 内存限流阈值设置
# 0.4表示阈值总可用内存的比值。 总可用内存表示操作系统给每个进程分配的大小,或者实际内存大小。
# 如32位的Windows,系统给每个进程最大2GB的内存,则此比值表示阈值为820MB
vm_memory_high_watermark.relative = 0.4# 还可以直接通过绝对值限制可用内存大小,单位字节
vm_memory_high_watermark.absolute = 1073741824# 从RabbitMQ 3.6.0开始,绝对值支持计量单位。如果设置了相对值,则忽略此设置值
vm_memory_high_watermark.absolute = 1024MiBk, kiB : kibibytes(2^10 - 1024 bytes)
M, MiB : mibibytes(2^20 - 1024576 bytes)
G, GiB : gibibytes(2^30 - 1073741824 bytes)KB: kilobytes (10^3 - 1000 bytes)
MB: megabytes (10^6 - 1000000 bytes)
GB: gigabytes (10^9 - 1000000000 bytes)

可以通过两种来设置生效

  1. 临时生效

    此配制仅当前生效在重启后将失效。

# 硬盘资源限制
rabbitmqctl set_disk_free_limit 68996808704
# 内存资源限制
rabbitmqctl set_vm_memory_high_watermark 0.4

样例:

[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...
  1. 长期生效

在rabbitmq.conf的配制文件中加入

# 硬盘限制 
disk_free_limit.absolute=68455178240# 内存限制
vm_memory_high_watermark.relative = 0.4

样例:

[root@nullnull-os rabbitmq]# vi  /etc/rabbitmq/rabbitmq.conf 
# 加入以下内容,注意单位到字节
disk_free_limit.absolute=68455178240[root@nullnull-os rabbitmq]# cat /etc/rabbitmq/rabbitmq.conf 
disk_free_limit.absolute=68455178240[root@nullnull-os rabbitmq]# systemctl restart rabbitmq-server
[root@nullnull-os rabbitmq]# 

注意,此需要重启rabbitMQ才能生效。

磁盘限制配制参考

Configuring Disk Free Space Limit

The disk free space limit is configured with the disk_free_limit setting. By default 50MB is required to be free on the database partition (see the description of file locations for the default database location). This configuration file sets the disk free space limit to 1GB:

disk_free_limit.absolute = 1000000000

Or you can use memory units (KB, MB GB etc.) like this:

disk_free_limit.absolute = 1GB

It is also possible to set a free space limit relative to the RAM in the machine. This configuration file sets the disk free space limit to the same as the amount of RAM on the machine:

disk_free_limit.relative = 1.0

The limit can be changed while the broker is running using the rabbitmqctl set_disk_free_limit command or rabbitmqctl set_disk_free_limit mem_relative command. This command will take effect until next node restart.

The corresponding configuration setting should also be changed when the effects should survive a node restart.

来自:https://www.rabbitmq.com/disk-alarms.html

内存配制限制参考

https://www.rabbitmq.com/memory.html

Configuring the Memory Threshold

The memory threshold at which the flow control is triggered can be adjusted by editing the configuration file.

The example below sets the threshold to the default value of 0.4:

\# new style config format, recommended
vm_memory_high_watermark.relative = 0.4

The default value of 0.4 stands for 40% of available (detected) RAM or 40% of available virtual address space, whichever is smaller. E.g. on a 32-bit platform with 4 GiB of RAM installed, 40% of 4 GiB is 1.6 GiB, but 32-bit Windows normally limits processes to 2 GiB, so the threshold is actually to 40% of 2 GiB (which is 820 MiB).

Alternatively, the memory threshold can be adjusted by setting an absolute limit of RAM used by the node. The example below sets the threshold to 1073741824 bytes (1024 MiB):

vm_memory_high_watermark.absolute = 1073741824

Same example, but using memory units:

vm_memory_high_watermark.absolute = 1024MiB

If the absolute limit is larger than the installed RAM or available virtual address space, the threshold is set to whichever limit is smaller.

The memory limit is appended to the log file when the RabbitMQ node starts:

2019-06-10 23:17:05.976 [info] <0.308.0> Memory high watermark set to 1024 MiB (1073741824 bytes) of 8192 MiB (8589934592 bytes) total

The memory limit may also be queried using the rabbitmq-diagnostics memory_breakdown and rabbitmq-diagnostics status commands.

The threshold can be changed while the broker is running using the

rabbitmqctl set_vm_memory_high_watermark <fraction>

command or

rabbitmqctl set_vm_memory_high_watermark absolute <memory_limit>

For example:

rabbitmqctl set_vm_memory_high_watermark 0.6

and

rabbitmqctl set_vm_memory_high_watermark absolute "4G"

When using the absolute mode, it is possible to use one of the following memory units:

  • M, MiB for mebibytes (2^20 bytes)
  • MB for megabytes (10^6 bytes)
  • G, GiB for gibibytes (2^30 bytes)
  • GB for gigabytes (10^9 bytes)

中文配制可参考:https://www.cnblogs.com/kaishirenshi/p/12132703.html

更多配制可参见:https://www.rabbitmq.com/configure.html#config-file

样例程序:

maven导入

            <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>

生产程序:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;public class ResourceLimitProduct {public static void main(String[] args) throws Exception {// 资源限制ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel(); ) {// 定义交换器、队列和绑定channel.exchangeDeclare("res.limit.ex", BuiltinExchangeType.DIRECT, true, false, null);channel.queueDeclare("res.limit.qu", true, false, false, null);channel.queueBind("res.limit.qu", "res.limit.ex", "res.limit.rk");// 开启发送方确认机制AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();ConfirmCallback confirm =new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("【批量确认】:小于" + deliveryTag + "已经确认");} else {System.out.println("【单条确认】:等于" + deliveryTag + "已经确认");}}};ConfirmCallback nackConfirm =new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("【批量不确认】:小于" + deliveryTag + "已经确认");} else {System.out.println("【单条不确认】:等于" + deliveryTag + "已经确认");}}};channel.addConfirmListener(confirm, nackConfirm);for (int i = 0; i < 100000000; i++) {String msg = getKbMessage(i);long sequence = channel.getNextPublishSeqNo();System.out.println("【发送】成功了序列消息:" + sequence);AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.contentType("text/plain");// 发送的消息持久化builder.deliveryMode(2);AMQP.BasicProperties properties = builder.build();channel.basicPublish("res.limit.ex", "res.limit.rk", properties, msg.getBytes(StandardCharsets.UTF_8));Thread.sleep(ThreadLocalRandom.current().nextInt(5, 100));}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}private static String getKbMessage(int i) {StringBuilder msg = new StringBuilder("发送确认消息:" + i + "--");for (int j = 0; j < 102400; j++) {msg.append(j);}return msg.toString();}
}

设置硬盘资源限制

[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...

运行生产者的应用程序,查看控制台的输出

【发送】成功了序列消息:1
【单条确认】:等于1已经确认
【发送】成功了序列消息:2
【发送】成功了序列消息:3
【单条确认】:等于2已经确认
【发送】成功了序列消息:4
【单条确认】:等于3已经确认
【发送】成功了序列消息:5
......
【单条确认】:等于702已经确认
【单条确认】:等于703已经确认
【发送】成功了序列消息:704
【发送】成功了序列消息:705
【发送】成功了序列消息:706
【发送】成功了序列消息:707
【发送】成功了序列消息:708
【发送】成功了序列消息:709
【发送】成功了序列消息:710
【发送】成功了序列消息:711

到此使用硬盘空间限制的测试完成。

内存资源限制

编辑配制文件rabbitmq.conf

vi /etc/rabbitmqrabbitmq.conf # 添加配制
vm_memory_high_watermark.absolute=120M

重启让其生效

systemctl restart rabbitmq-server

检查配制生效情况

[root@nullnull-os rabbitmq]# rabbitmqctl environment......{trace_vhosts,[]},{vhost_restart_strategy,continue},{vm_memory_calculation_strategy,rss},{vm_memory_high_watermark,{absolute,"120MB"}},{vm_memory_high_watermark_paging_ratio,0.5},{writer_gc_threshold,1000000000}]},{rabbit_common,[]},
......

查看到如下配制说明生效。

运行生产者

观察客户端输出

【发送】成功了序列消息:1
【发送】成功了序列消息:2
【单条确认】:等于1已经确认
【发送】成功了序列消息:3
【单条确认】:等于2已经确认
【单条确认】:等于3已经确认
【发送】成功了序列消息:4
【发送】成功了序列消息:5
【发送】成功了序列消息:6
【单条确认】:等于4已经确认
【单条确认】:等于5已经确认
【单条确认】:等于6已经确认
【发送】成功了序列消息:7
【单条确认】:等于7已经确认
......
【发送】成功了序列消息:174
【单条确认】:等于174已经确认
【发送】成功了序列消息:175
【单条确认】:等于175已经确认
【发送】成功了序列消息:176
【单条确认】:等于176已经确认
【发送】成功了序列消息:177
【发送】成功了序列消息:178
【发送】成功了序列消息:179
【发送】成功了序列消息:180
【发送】成功了序列消息:181
【发送】成功了序列消息:182
【发送】成功了序列消息:183
【发送】成功了序列消息:184
【发送】成功了序列消息:185
【发送】成功了序列消息:186
【发送】成功了序列消息:187

观察网页端的情况

在这里插入图片描述

到此内存资源限制而导致的限流测试完成。

7.7.2 默认的credit flow流控

RabbitMQ Credit Flow Mechanism (信用流控制机制) 是 RabbitMQ 使用的一种流量控制机制,旨在确保生产者(publishers)不会发送太多的消息给消费者(consumers),从而导致系统超载或资源耗尽。这个机制主要是为了保护消费者免受生产者发送太多消息的影响。

以下是 RabbitMQ Credit Flow 机制的基本工作原理:

  1. 信用计数器(Credit Counter):对于每个消费者,RabbitMQ 维护一个称为信用计数器的值。这个计数器表示消费者当前可以接收多少条消息。
  2. 初始信用额度(Initial Credit):当一个消费者连接到队列并开始消费消息时,RabbitMQ 为该消费者分配一个初始信用额度。这个额度通常与队列中的未确认消息数量有关。
  3. 消费者确认(Consumer Acknowledgments):当消费者成功处理一条消息并确认它时,它将会恢复一定数量的信用,这允许 RabbitMQ 将更多的消息发送给消费者。
  4. 信用降低(Decreasing Credit):当消费者未确认消息超出其信用额度时,其信用额度将降低。这会导致生产者无法继续发送消息给该消费者,直到其信用额度恢复。
  5. 自动降低的消费者(Auto-decrease Consumers):RabbitMQ 还可以配置为自动降低某些消费者的信用,以避免某个消费者占用太多资源。这通常用于处理慢速或长时间处理的消费者。

这个机制有助于平衡生产者和消费者之间的消息流量,防止生产者发送大量消息导致队列爆满,从而提高系统的稳定性和可靠性。

要注意的是,RabbitMQ 的信用流控制机制是可配置的,您可以根据您的需求来调整信用额度和其他参数,以满足特定的应用场景。此外,RabbitMQ 还提供了一些工具和插件,用于监控和管理流量控制,以确保系统的正常运行。

可以通过查看队列的状态信息来了解 Credit Flow 机制的当前状态。以下是一些常见的方式来查看 Credit Flow 状态:

  1. RabbitMQ Management UI:RabbitMQ 提供了一个基于 Web 的管理界面,您可以通过该界面查看队列的状态和统计信息,包括队列的消息数量、未确认消息数量以及消费者的状态。要访问管理界面,请确保已启用 RabbitMQ Management 插件。默认情况下,它通常在 http://localhost:15672/ 上运行。

    在管理界面中,您可以选择特定的队列,然后查看其状态和相关的统计信息,包括未确认消息数量。这可以帮助您了解 Credit Flow 是否生效,是否有消费者的信用已用尽。

  2. 命令行工具:您还可以使用 RabbitMQ 的命令行工具来查看队列的状态。以下是一个示例命令,用于查看队列的状态:

    rabbitmqctl list_queues name messages consumers messages_unacknowledged
    

    这将显示队列的名称、消息数量、消费者数量以及未确认消息数量。未确认消息数量表示消费者尚未确认的消息数量,这可以用于判断 Credit Flow 是否生效。

  3. 监控工具:您可以使用监控工具(如Prometheus和Grafana)来设置自定义监控和警报,以便实时跟踪队列的状态和信用流控制情况。通过这些工具,您可以创建仪表板来显示队列的各种指标,包括未确认消息数量和消费者的信用。

通过以上方法,您可以监视 RabbitMQ 中队列的状态和 Credit Flow 机制的工作情况,以确保系统的稳定性和可靠性。

在这里插入图片描述

7.7.3 Qos机制

RabbitMQ中有一种Qos保证机制,可以限制channel上接收到的未被Ack的消息数量,如果过这个数量限制RabbitMQ将不会再往消费端推送消息。是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)需要注意的是Qos机制仅对消费端推模式有效,对拉模式无效。而且不支持NONE-ACK模式。

执行channel.basicConsume方法之前通过channel.basicQos方法可以设置该数量。消息的发送是异步的,消息的确认也是异步的。在消费慢的时候可以设置Qos的prefetchCount,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消息而没有一个消息确认的时候,就停止发送。消费者确认一个.broker就发送一个,确认两个就发送两个,换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount个。

如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都确认了。

生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快县城超过了下游的消费速度时就容易出现消息积压、堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急开关等手段,避免超过broker端的极限承载能力或者压垮下游消费者。

再讲消费者,我们期望消费者能够尽快的消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端能够处理速度是最快、最稳定而且还相对均匀(比较理想化)

提供应用吞吐量和缩短消费过程的耗时,主要以下几种方式:

  1. 优化应用程序的性能,缩短响应时间
  2. 增加消费节点实例。
  3. 调整并发消费的线程数。

测试

maven导入:

            <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;public class QosProduct {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("qos.ex",BuiltinExchangeType.DIRECT,// 持久化标识false,// 是否自动删除false,// 属性信息null);for (int i = 0; i < 100; i++) {String msg = "这是发送的消息:" + i;channel.basicPublish("qos.ex", "qos.rk", null, msg.getBytes(StandardCharsets.UTF_8));}}
}

消费者 :

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.ThreadLocalRandom;public class QosConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换器、队列和绑定channel.exchangeDeclare("qos.ex", BuiltinExchangeType.DIRECT, false, false, null);channel.queueDeclare("qos.qu", false, false, false, null);channel.queueBind("qos.qu", "qos.ex", "qos.rk");// 设置Qos为5,未被确认ACK的为5,还有一个参数,即是否为全局,true为全局channel.basicQos(5);channel.basicConsume("qos.qu",false,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {LocalDateTime time = LocalDateTime.now();System.out.println("[消费]" + time + "+收到的消息:" + new String(body, StandardCharsets.UTF_8));int randomSleep = ThreadLocalRandom.current().nextInt(20, 1000);try {Thread.sleep(randomSleep);} catch (InterruptedException e) {e.printStackTrace();}if (envelope.getDeliveryTag() % 3 == 0) {// 进行消息确认channel.basicAck(envelope.getDeliveryTag(), true);}}});}
}

测试

先启动消费都,再启动生产者,查看控制台输出

[消费]2023-08-25T12:08:13.143+收到的消息:这是发送的消息:0
[消费]2023-08-25T12:08:13.765+收到的消息:这是发送的消息:1
[消费]2023-08-25T12:08:14.127+收到的消息:这是发送的消息:2
[消费]2023-08-25T12:08:14.892+收到的消息:这是发送的消息:3
......
[消费]2023-08-25T12:08:57.437+收到的消息:这是发送的消息:96
[消费]2023-08-25T12:08:57.530+收到的消息:这是发送的消息:97
[消费]2023-08-25T12:08:57.566+收到的消息:这是发送的消息:98
[消费]2023-08-25T12:08:57.649+收到的消息:这是发送的消息:99

查看队列的情况:

[root@nullnull-os ~]# rabbitmqctl list_channels name,prefetch_count,global_prefetch_count --formatter pretty_table
Listing channels ...
┌───────────────────────────────────────────┬────────────────┬───────────────────────┐
│ name                                      │ prefetch_count │ global_prefetch_count │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:59116 -> 10.0.4.16:5672 (1) │ 5              │ 0                     │
└───────────────────────────────────────────┴────────────────┴───────────────────────┘
[root@nullnull-os ~]# 

网页端查看

在这里插入图片描述

并行消费者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;public class QosThreadConsumer {public static void main(String[] args) throws Exception {// 资源限制ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");// 设置channel并发请求最大数factory.setRequestedChannelMax(5);// 自定义线程池工厂ThreadFactory thsFactory = Executors.privilegedThreadFactory();factory.setThreadFactory(thsFactory);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换器、队列和绑定channel.exchangeDeclare("qos.ex", BuiltinExchangeType.DIRECT, false, false, null);channel.queueDeclare("qos.qu", false, false, false, null);channel.queueBind("qos.qu", "qos.ex", "qos.rk");// 设置每秒处理2个channel.basicQos(5, true);channel.basicConsume("qos.qu",false,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {LocalDateTime time = LocalDateTime.now();long threadId = Thread.currentThread().getId();System.out.println("[消费]"+ time+ ",线程:"+ threadId+ ",收到的消息:"+ new String(body, StandardCharsets.UTF_8));int randomSleep = ThreadLocalRandom.current().nextInt(20, 1000);try {Thread.sleep(randomSleep);} catch (InterruptedException e) {e.printStackTrace();}if (envelope.getDeliveryTag() % 3 == 0) {// 进行消息确认channel.basicAck(envelope.getDeliveryTag(), true);}}});}
}

控制台输出:

[消费]2023-08-26T09:37:21.430,线程:24,收到的消息:这是发送的消息:0
[消费]2023-08-26T09:37:21.866,线程:25,收到的消息:这是发送的消息:1
[消费]2023-08-26T09:37:22.434,线程:25,收到的消息:这是发送的消息:2
[消费]2023-08-26T09:37:22.847,线程:25,收到的消息:这是发送的消息:3
[消费]2023-08-26T09:37:23.685,线程:25,收到的消息:这是发送的消息:4
[消费]2023-08-26T09:37:23.847,线程:26,收到的消息:这是发送的消息:5
......
[消费]2023-08-26T09:39:10.684,线程:28,收到的消息:这是发送的消息:526
[消费]2023-08-26T09:39:10.695,线程:32,收到的消息:这是发送的消息:527
[消费]2023-08-26T09:39:10.767,线程:32,收到的消息:这是发送的消息:528
......
[消费]2023-08-26T09:39:58.270,线程:27,收到的消息:这是发送的消息:996
[消费]2023-08-26T09:39:58.405,线程:27,收到的消息:这是发送的消息:997
[消费]2023-08-26T09:39:58.575,线程:27,收到的消息:这是发送的消息:998
[消费]2023-08-26T09:39:58.671,线程:27,收到的消息:这是发送的消息:999

如果Qos设置为全局,则可以看到到

[root@nullnull-os ~]# rabbitmqctl list_channels name,prefetch_count,global_prefetch_count --formatter pretty_table
Listing channels ...
┌───────────────────────────────────────────┬────────────────┬───────────────────────┐
│ name                                      │ prefetch_count │ global_prefetch_count │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:60591 -> 10.0.4.16:5672 (1) │ 0              │ 5                     │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:60610 -> 10.0.4.16:5672 (1) │ 0              │ 0                     │
└───────────────────────────────────────────┴────────────────┴───────────────────────┘
[root@nullnull-os ~]# 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/61268.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

labview中的调用链

在有些项目中会用到调用链&#xff0c;用我自己的理解来说就像是递归函数那样层层调用&#xff0c;然后结果回退到第一次开始调用的main函数哪里&#xff0c;这里对于传值的时候还是非常好用&#xff0c;通过更改子VI然后来改变主VI的里面函数的值来实现这个效果。 我们可以看…

MySQL初学之旅(3)约束

目录 1.前言 2.正文 2.1约束类型 2.2NULL约束 2.3UNIQUE约束 2.4DEFAULT约束 2.5PRIMARY KEY主键约束 2.6FOREIGN KEY外键约束 2.7CHECK约束 3.小结 1.前言 哈喽大家好啊&#xff0c;今儿来继续给大家分享最近学习的MySQL和约束相关的知识点&#xff0c;希望大家一起…

Ubuntu Linux使用前准备动作 配置SSH

在 Ubuntu 系统中配置 SSH 服务可以通过以下步骤进行&#xff1a; 1、安装ssh服务 1&#xff09;打开终端&#xff08;可以使用快捷键 Ctrl Alt T&#xff09;。 2&#xff09;运行以下命令安装 OpenSSH 服务器&#xff1a; sudo apt-get update&#xff1a;这一步是更新…

在MATLAB中实现自适应滤波算法

自适应滤波算法是一种根据信号特性自动调整滤波参数的数字信号处理方法&#xff0c;其可以有效处理噪声干扰和信号畸变问题。在许多实时数据处理系统中&#xff0c;自适应滤波算法得到了广泛应用。在MATLAB中&#xff0c;可以使用多种方法实现自适应滤波算法。本文将介绍自适应…

【Vue笔记】基于vue3 + element-plus + el-dialog封装一个自定义的dialog弹出窗口组件

这篇文章,介绍一下如何使用vue3+element-plus中的el-dialog组件,自己封装一个通用的弹出窗口组件。运行效果如下所示: 目录 1.1、父子组件通信 1.2、自定义VDialog组件(【v-model】模式) 1.2.1、编写VDialog组件代码 1.2.2、使用VDialog组件 1.2.3、运行效果 1.3、自…

学习笔记024——Ubuntu 安装 Redis遇到相关问题

目录 1、更新APT存储库缓存&#xff1a; 2、apt安装Redis&#xff1a; 3、如何查看检查 Redis版本&#xff1a; 4、配置文件相关设置&#xff1a; 5、重启服务&#xff0c;配置生效&#xff1a; 6、查看服务状态&#xff1a; 1、更新APT存储库缓存&#xff1a; sudo apt…

学习记录:js算法(九十九):冗余连接

文章目录 冗余连接思路一 冗余连接 树可以看成是一个连通且 无环 的 无向 图。 给定往一棵 n 个节点 (节点值 1&#xff5e;n) 的树中添加一条边后的图。添加的边的两个顶点包含在 1 到 n 中间&#xff0c;且这条附加的边不属于树中已存在的边。图的信息记录于长度为 n 的二维数…

记录———封装uni-app+vant(u-upload)上传图片组件

上传图片回显&#xff0c;自定义图片回显样式 这段代码是一个Vue组件&#xff0c;主要实现了图片上传和预览的功能。组件接收了父组件传递的图片列表、最大图片数量和上传状态等属性。在模板中&#xff0c;使用了uni-easyinput组件和u-upload组件来实现图片上传和预览功能。在…

【图像处理识别】数据集合集!

本文将为您介绍经典、热门的数据集&#xff0c;希望对您在选择适合的数据集时有所帮助。 1 CNN-ImageProc-Robotics 机器人 更新时间&#xff1a;2024-07-29 访问地址: GitHub 描述&#xff1a; 通过 CNN 和图像处理进行机器人对象识别项目侧重于集成最先进的深度学习技术和…

高亮变色显示文本中的关键字

效果 第一步&#xff1a;按如下所示代码创建一个用来高亮显示文本的工具类&#xff1a; public class KeywordUtil {/*** 单个关键字高亮变色* param color 变化的色值* param text 文字* param keyword 文字中的关键字* return*/public static SpannableString highLigh…

[javascript]js的五子棋让红蓝双方自己跟自己下棋

运行效果&#xff08;这是未分出胜负&#xff09;&#xff1a; 这是分出胜负&#xff1a; 源代码&#xff0c;把下边的代码放到1.html&#xff0c;然后用浏览器打开&#xff0c;就可以&#xff0c;然后刷新网页&#xff1a; <!DOCTYPE html> <html><body>&l…

【list的模拟实现】—— 我与C++的模拟实现(十四)

一、list节点 ​ list是一个双向循环带头的链表&#xff0c;所以链表节点结构如下&#xff1a; template<class T>struct ListNode{T val;ListNode* next;ListNode* prve;ListNode(int x){val x;next prve this;}};二、list迭代器 2.1、list迭代器与vector迭代器区别…

ssh隧道代理访问内网应用

目录 场景 ssh配置 .ssh目录结构 常见文件及用途 config id_rsa 和 id_rsa.pub authorized_keys known_hosts&#xff1a; known_hosts.old&#xff1a; environment&#xff1a; ssh_config&#xff1a; 配置隧道访问内网应用流程 1.生成密钥对 2.将公钥添加到远…

从0开始学习机器学习--Day26--聚类算法

无监督学习(Unsupervised learning and introduction) 监督学习问题的样本 无监督学习样本 如图&#xff0c;可以看到两者的区别在于无监督学习的样本是没有标签的&#xff0c;换言之就是无监督学习不会赋予主观上的判断&#xff0c;需要算法自己去探寻区别&#xff0c;第二张…

基于YOLOv8深度学习的智慧农业猪行为检测系统研究与实现(PyQt5界面+数据集+训练代码)

随着智慧农业的快速发展&#xff0c;畜牧业的智能化管理已逐渐成为提高生产效率、提升动物福利、降低运营成本的关键手段之一。在此背景下&#xff0c;畜牧场对动物行为的自动化监测需求日益增长&#xff0c;尤其是在大型养猪场&#xff0c;猪群的日常行为检测对于疾病预防、饲…

C++:指针和引用

指针的基础 数据在内存当中是怎么样被存储的 数据在内存中的存储方式取决于数据的类型和计算机的体系结构 基本数据类型 整数类型&#xff1a;整数在内存中以二进制补码的形式存储。对于有符号整数&#xff0c;最高位为符号位&#xff0c;0 表示正数&#xff0c;1 表示负数。…

使用esp32c3开发板通过wifi连网络web服务器

实验基本拓扑就是&#xff1a; esp32c3开发板通过Wifi模块连上局域网&#xff0c;局域网一台服务器通过FastAPI提供8000端口的web服务&#xff0c;在esp32c3开发板中烧录micropython固件&#xff0c;在python交互模式下&#xff0c;连上Wifi模块&#xff0c;并使用socket模块获…

自动化运维-检测Linux服务器CPU、内存、负载、IO读写、机房带宽和服务器类型等信息脚本

前言&#xff1a;以上脚本为今年8月1号发布的&#xff0c;当时是没有任何问题&#xff0c;但现在脚本里网络速度测试py文件获取不了了&#xff0c;测速这块功能目前无法实现&#xff0c;后面我会抽时间来研究&#xff0c;大家如果有建议也可以分享下。 脚本内容&#xff1a; #…

网络安全:我们的安全防线

在数字化时代&#xff0c;网络安全已成为国家安全、经济发展和社会稳定的重要组成部分。网络安全不仅仅是技术问题&#xff0c;更是一个涉及政治、经济、文化、社会等多个层面的综合性问题。从宏观到微观&#xff0c;网络安全的重要性不言而喻。 宏观层面&#xff1a;国家安全与…

通威传媒:移动AI数字人OLED透明屏应用案例

在科技与创新不断交融的今天&#xff0c;尼伽OLED品牌与通威传媒携手合作&#xff0c;共同推出了移动AI数字人OLED透明屏显示设备。这款设备不仅融合了尼伽OLED品牌的卓越显示技术与通威传媒的深厚积累&#xff0c;更在定点介绍、手动讲解模式、中控控制以及数字人联动等方面实…