RabbitMQ线程和连接模型详解

1. 线程、信道、连接、请求的概念

客户端(生产者)和服务端(服务端)之间建立连接。例如TCP连接,是一个长连接,也是较为稳定的连接,开销也较大。一般而言主客户端之间需要一个连接。但服务器需要连接多个客户端。

客户端的消息(请求)需要进过经过信道到达服务端。信道是一种逻辑上的连接通道,多个信道复用了同一个连接。(RabbitMQ默认的最大通道数2047)

2. RabbitMQ的线程模型

rabbitmq.client源码:

通过断点查找发现原来是 ConsumerWorkService MQ工作线程池这个类控制的,工作线程池和心跳线程会在消费者(服务端)启动时初始化。

这个类构造函数里有一个executor参数,当这个参数为空时,就会创建一个Executors.newFixedThreadPool,代码如下:

amqp-client的package com.rabbitmq.client.impl下有类

final public class ConsumerWorkService {private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerWorkService.class);private static final int MAX_RUNNABLE_BLOCK_SIZE = 256;private static final int DEFAULT_NUM_THREADS = Math.max(1, Utils.availableProcessors());private final ExecutorService executor;private final boolean privateExecutor;private final WorkPool<Channel, Runnable> workPool;private final int shutdownTimeout;public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) {this.privateExecutor = (executor == null);if (executor == null) {LOGGER.debug("Creating executor service with {} thread(s) for consumer work service", DEFAULT_NUM_THREADS);this.executor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory);} else {this.executor = executor;}this.workPool = new WorkPool<>(queueingTimeout);this.shutdownTimeout = shutdownTimeout;}

这个类定义RabbitMQ服务端(消费者)的线程模型,底层也是一个ExecutorService的线程池模型,默认的线程数量是可用的CPU核数(处理器数)。

3. 浅浅的看看连接模型

从一般性的客户端的rabbitmq的使用出发

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip);
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String message = "RabbitMQ Demo Test:" + System.currentTimeMillis();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.close();
connection.close();

建立连接时一般会用到ConnectionFactory的newConnection()

public Connection newConnection(Address[] addrs) throws IOException, TimeoutException {return newConnection(this.sharedExecutor, Arrays.asList(addrs), null);
}

连接会传入一个共享的线程池,broker服务端和客户端的连接共享这个连接池,这个属性表示内部使用共享的唯一一个ExecutorService
设置这个属性就可以一直传到ConsumerWorkService中。

Set the executor to use for consumer operation dispatch by default for newly created connections. All connections that use this executor share it. It’s developer’s responsibility to shut down the executor when it is no longer needed.

默认情况下,为新创建的连接设置用于消费者操作调度的线程池。所有使用此线程池的连接都共享它。当不再需要线程池时,关闭它是开发人员的责任。

通过设置shareExecutorService,无论多少个channel,都可以统一控制线程数量、队列数量,根据实际情况进行配置。

也可以传入其他的executor。

4. RabbitMQ连接的具体过程

rabbitmq采用的amqp协议,是一个高级的应用层协议。

  1. 将 AMQP 0-9-1 的连接头写入底层套接字,包含指定的版本信息(客户端告诉 broker 自己使用的协议及版本,底层使用 java 自带的 socket)。

  2. 客户端等待 broker 发送的 Connection.Start (broker 告诉客户端 通信的协议和版本、SASL认证机制(详细见)、语言环境以及RabbitMQ的版本信息和支持能力)。

  3. 客户端接收后 发送 Connection.StartOk (客户端告诉 broker 连接使用的帐号和密码、认证机制、语言环境、客户的信息以及能力)。

  4. 客户端等待 broker 发送的 Connection.Tune (broker 与 客户端 进行参数协商)。

  5. 客户端接收后 发送 Connection.TuneOk (客户端 参数 [ChannelMax、FrameMax、Heartbeat] 协商完成后告诉 broker)。

  6. 客户端发送 Connection.Open (客户端 告诉 broker 打开一个连接,并请求设置_virtualHost [vhost])。

  7. broker 接收到后返回 Connection.OpenOk (client 对 vhost 进行验证,成功则返回如下此信息)。

  8. 客户端发送 Channel.Open,broker 接收到后返回 Channel.OpenOk (客户端 创建通道;broker 收到并创建通道完成)。

  9. 客户端发送 Confirm.Select,broker 接收到后返回 Confirm.SelectOk(客户端告诉 broker 消息需要使用 confirm的机制,broker收到并回复)。。

  10. 客户端发送消息 Basic.Publish,broker 应答返回 Basic.Ack。

  11. 期间 客户端和 broker 会相互检查彼此的心跳 heartbeat。

  12. 客户端 关闭通道 Channel.Close,broker 应答返回 Channel.CloseOk。

  13. 客户端 关闭连接 Connection.Close,broker 应答返回 Connection.CloseOk。

rabbitmq具体源码细节和spring中的是类似的,可以参考我的另一篇文章,分析地很详细。敲详细的springframework-amqp-rabbit源码解析

不同在于消息底层是用帧来包装,有分心跳帧和携带方法或信息的帧。Consumer也是带有queue的。

客户端单个connection对应的单个channel实际上是单线程的,每次收到Socket消息都触发处理逻辑,从任务队列里面取出一定的任务进行依次处理,如果一个channel订阅了多个topic的话也是单线程依次处理的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/48226.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Node.js基础03】利用http模块创建Web服务

一&#xff1a;使用步骤 1 加载http模块&#xff0c;并创建Web服务程序 2 利用Web服务程序监听request事件&#xff0c;设置响应头和响应体 3 配置端口号并启动Web服务 4 浏览器请求设置的端口号&#xff0c;进行Web服务程序测试 二&#xff1a;简单应用 const http requir…

基于多线程延迟排序的睡眠排序算法的创新与改进

基于多线程延迟排序的睡眠排序算法的创新与改进 摘要 本文在传统睡眠排序算法的基础上&#xff0c;提出了一种改进方案&#xff0c;旨在优化处理负数和大规模数据集的性能。通过引入线程池管理和数据分段排序技术&#xff0c;改进后的算法在处理大数据集和包含负数的数据集时…

计算机网络入门 -- TCP详解

计算机网络入门 – TCP详解 1.TCP协议 1.1 报文格式 1.32位序号&#xff1a;该条TCP数据携带的起始序号。 2.32位确认序号&#xff1a;期望对方发送数据从那个序号开始发送。 3.4位首部长度&#xff1a;最大为0xF(15)&#xff0c;指的是TCP头部长度。 首部长度 4 位首部长…

谷粒商城实战笔记-37-前端基础-Vue-基本语法插件安装

文章目录 一&#xff0c;v-model1&#xff0c;双向绑定2&#xff0c;vue的双向绑定2.1 html元素上使用指令v-model2.2 model中声明对应属性2.3&#xff0c;验证view绑定modelmodel绑定view 完整代码 二&#xff0c;v-on1&#xff0c;指令简介2&#xff0c;在button按钮中添加v-…

rimraf快速删除node_modules方法

项目中&#xff0c;有时候会遇到下载依赖报错&#xff0c;然后想要删除node_modules再重新下载&#xff0c;但是有时候直接用yarn 或者npm install仍热不行&#xff0c;我们可以尽量用yran&#xff0c;因为npm 可能会自动下一些给一些包升级了&#xff0c;此时因为前面已经下过…

JVM:GraalVM

文章目录 一、介绍1、什么是GraalVM&#xff1a;2、GraalVM版本 二、两种使用模式 一、介绍 1、什么是GraalVM&#xff1a; GraalVM是Oracle官方推出的一款高性能JDK&#xff0c;使用它享受比OpenJDK或者OracleJDK更好的性能。GraalVM的官网地址&#xff1a;https://www.graa…

泛型新理解

1.创建三个类&#xff0c;并写好对应关系 package com.jmj.gulimall.study;public class People { }package com.jmj.gulimall.study;public class Student extends People{ }package com.jmj.gulimall.study;public class Teacher extends People{ }2.解释一下这三个方法 pub…

Xubuntu22.04 终端命令调用图形设置工具

最近将软件运行环境从Xubuntu16.04迁移到了Xubuntu22.04&#xff0c;主要是为了解决Qt程序的图形渲染使用集显去处理&#xff0c;而不是使用CPU。https://blog.csdn.net/qq_45445740/article/details/134495914 使用Xubuntu22.04系统发现很多图形设置工具不太容易在桌面找到&am…

数据结构(稀疏数组)

简介 稀疏数组是一种数据结构&#xff0c;用于有效地存储和处理那些大多数元素都是零或者重复值的数组。在稀疏数组中&#xff0c;只有非零或非重复的元素会被存储&#xff0c;从而节省内存空间。 案例引入 假如想把下面这张表存入文件&#xff0c;我们会怎么做&#xff1f;…

【LeetCode】翻转二叉树

目录 一、题目二、解法完整代码 一、题目 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 示例 1&#xff1a; 输入&#xff1a;root [4,2,7,1,3,6,9] 输出&#xff1a;[4,7,2,9,6,3,1] 示例 2&#xff1a; 输入&#xff1a;root…

数据结构 day1

2024.7.20 数据结构之旅 那么接下来&#xff0c;是笔者每日学习之后的总结&#xff0c;以此来巩固复习&#xff0c; 本次目标 入门 中等的数据结构学习&#xff0c;come on 前置知识点: C语言的 (指针、结构、库函数、内存管理等) 数据结构定义&#xff1a; 用于计算机存储、…

【系统架构设计 每日一问】一 在单表查询的情况下,ES快还是mysql快

在单表查询的情况下&#xff0c;Elasticsearch&#xff08;ES&#xff09;和MySQL的查询速度对比并非绝对&#xff0c;而是取决于多个因素&#xff0c;包括查询类型、数据量大小、索引策略、系统配置等。以下是对两者在单表查询速度方面的详细分析&#xff1a; 一、查询类型 E…

php-fpm如何配置max_children参数

前言 略 php-fpm 资源耗尽 php-fpm 的子进程耗尽的时&#xff1a; 会导致 502 出现nginx 出现错误日志 2024/07/18 20:19:10 [crit] 36390#0: *1402471 connect() to unix:/tmp/php-cgi-81.sock failed (2: No such file or directory) while connecting to upstream, cli…

Spring Authorization Server实战

Spring Authorization Server实战 Spring Authorizatin Server Spring Authorizatin Server是一个框架&#xff0c;它提供了OAuth2.1和OpenID Connect 1.0规范以及其它相关规范的实现&#xff0c;它是基于Spring Security构建的 OAuth2.0协议介绍 OAuth是一个开放标准的授权…

使用docker swarm搭建ruoyi集群环境

整体目标 项目背景 领导给到了我一个客户&#xff0c;客户商业模式为成本制作&#xff0c;成本核算。其中涉及到大量涉密数据&#xff0c;且与我们现有产品几乎没有兼容点&#xff08;我们是一套低代码的框架&#xff0c;客户有很多业务二开&#xff09; 测试环境给到了我6台…

大模型学习笔记 - LLM模型架构

LLM 模型架构 LLM 模型架构 1. LLM 核心模型 Transformer2. 详细配置 2.1 归一化方法2.2 归一化模块位置2.3 激活函数2.4 位置编码 2.4.1 绝对位置编码2.4.2 相对位置编码2.4.3 旋转位置编码 RoPE2.4.4 ALiBi位置编码 2.5 注意力机制 2.5.1 完整自注意力机制2.5.2 稀疏注意力机…

ChatGPT实战100例 - (20) 如何玩转影刀RPA

文章目录 ChatGPT实战100例 - (20) 如何玩转影刀RPA背景需求需求分析与流程设计一、需求收集二、流程梳理三、可行性分析流程设计(详细步骤)具体步骤的影刀RPA实现流程图总结AIGC在影刀RPA中的使用总结其他RPA步骤中可能用到AIGC的地方展望总结ChatGPT实战100例 - (20) 如何玩…

LeYOLO, New Scalable and Efficient CNN Architecture for Object Detection

LeYOLO, New Scalable and Efficient CNN Architecture for Object Detection 论文链接&#xff1a;http://arxiv.org/abs/2406.14239 代码链接&#xff1a;https://github.com/LilianHollard/LeYOLO 一、介绍 本文关注基于FLOP的高效目标检测计算的神经网络架构设计选择&am…

【Vite】快速入门及其配置

概述 Vite是前端构建工具。vite 相较于webpack,vite采用了不同的运行方式&#xff1a; 开发时&#xff0c;并不对代码打包&#xff0c;而是直接采用ESM的方式来运行项目在项目打包部署时&#xff0c;使用 rollup 对项目进行打包除了速度外&#xff0c;vite使用起来也更加方便…

2024-7-20 IT新闻

目录 微软全球IT系统故障 中国量子计算产业峰会召开 其他IT相关动态 微软全球IT系统故障 后续处理&#xff1a; 微软和CrowdStrike均迅速响应&#xff0c;发布了相关声明并部署了修复程序。CrowdStrike撤销了有问题的软件更新&#xff0c;以帮助用户恢复系统正常运作。微软也…