Nodejs 第七十六章(MQ进阶)

MQ介绍和基本使用在上一章介绍过了,不再重复

  1. 消息:在RabbitMQ中,消息是传递的基本单元。它由消息体和可选的属性组成

  2. 生产者Producer:生产者是消息的发送方,它将消息发送到RabbitMQ的交换器(Exchange)中

  3. 交换器Exchange:交换器接收从生产者发送的消息,并根据特定的规则将消息路由到一个或多个队列中

  4. 队列Queue:队列是消息的接收方,它存储了待处理的消息。消费者可以从队列中获取消息并进行处理

  5. 消费者Consumer:消费者是消息的接收方,它从队列中获取消息并进行处理

MQ进阶用法

发布订阅

发布订阅,消息的发送者称为发布者(Publisher),而接收消息的一个或多个实体称为订阅者(Subscriber

回顾上一篇,点对点通讯生产者发送一条消息通过路由投递到Queue,只有一个消费者能消费到 也就是一对一发送

请添加图片描述

回归主题 发布订阅就是生产者的消息通过交换机写到多个队列,不同的订阅者消费不同的队列,也就是实现了一对多

发布订阅的模式分为四种

  1. Direct(直连)模式:把消息放到交换机指定key的队列里面。
  2. Topic(主题)模式: 把消息放到交换机指定key的队列里面,额外增加使用"*“匹配一个单词或使用”#"匹配多个单词
  3. Headers(头部)模式:把消息放到交换机头部属性去匹配队列
  4. Fanout(广播)模式:把消息放入交换机所有的队列,实现广播

发布订阅-代码编写

1. direct模式编写

主要就是通过 routingKey 匹配实现路由 这里的zs就是routingKey

生产者

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() 
//声明一个交换机
/*** @param {String} exchange 交换机的名称* @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式* @param {Object} options {durable: true} //开启消息持久化*/
await channel.assertExchange('logs', 'direct', {durable: true
})
//发送消息
/*** @param {String} exchange 交换机的名称* @param {String} routingKey 路由键* @param {Buffer} content 消息内容*///这里的zs就是routingKey
channel.publish('logs', 'zs', Buffer.from('小满direct模式发送的消息'))//断开
await channel.close()
await connection.close()
process.exit(0)

消费者(我们编写多个方便测试)

consume.js

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道await channel.assertExchange('logs', 'direct', {durable: true
})//添加一个队列
const { queue } = await channel.assertQueue('queue1', {durable: true
})
//绑定交换机
/*** @param {String} queue 队列名称* @param {String} exchange 交换机名称* @param {String} routingKey 路由键*/
//匹配对应的zs值才能收到
await channel.bindQueue(queue, 'logs', 'zs')
//接收消息
channel.consume('queue1', (msg) => {console.log(msg.content.toString());
}, {noAck: true //自动确认消息被消费
})

consume2.js

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道await channel.assertExchange('logs', 'direct', {durable: true
})//添加一个队列
const { queue } = await channel.assertQueue('queue2', {durable: true
})
//绑定交换机
/*** @param {String} queue 队列名称* @param {String} exchange 交换机名称* @param {String} routingKey 路由键*///匹配对应的zs值才能收到
await channel.bindQueue(queue, 'logs', 'zs')
//接收消息
channel.consume('queue2', (msg) => {console.log(msg.content.toString());
}, {noAck: true //自动确认消息被消费
})
2. Topic模式编写

我们把模式切换成了Topic 并且publish 发布的时候 routingKey 换成了 xm.xxxxxxxx

生产者

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() 
//声明一个交换机
/*** @param {String} exchange 交换机的名称* @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式* @param {Object} options {durable: true} //开启消息持久化*/
await channel.assertExchange('topic', 'topic', {durable: true
})
//发送消息
/*** @param {String} exchange 交换机的名称* @param {String} routingKey 路由键* @param {Buffer} content 消息内容*///注意这儿匹配规则换了 换成xm.xxxxxxxxxxxxxxxxxxxxx
channel.publish('logs', 'xm.sadsdsdasdasdasdsda', Buffer.from('小满topic模式发送的消息'))//断开
await channel.close()
await connection.close()
process.exit(0)

消费者匹配(注意这里匹配规则xm.*'使用了* 就是模糊匹配的意思)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道await channel.assertExchange('topic', 'topic', {durable: true
})//添加一个队列
const { queue } = await channel.assertQueue('queue1', {durable: true
})
//绑定交换机
/*** @param {String} queue 队列名称* @param {String} exchange 交换机名称* @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词*///这儿变化了
await channel.bindQueue(queue, 'topic', 'xm.*')
//接收消息
channel.consume('queue1', (msg) => {console.log(msg.content.toString());
}, {noAck: true //自动确认消息被消费
})
3. Headers模式

生产者(注意 publish 增加第四个参数开启了header 添加了data参数)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() //声明一个交换机/*** @param {String} exchange 交换机的名称* @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式* @param {Object} options {durable: true} //开启消息持久化*/await channel.assertExchange('headers', 'headers', {durable: true})//发送消息/*** @param {String} exchange 交换机的名称* @param {String} routingKey 路由键* @param {Buffer} content 消息内容* @param {Object} options {headers: {'key': 'value'}} //定义匹配规则*///嘿 这儿变了channel.publish('headers', '', Buffer.from('小满headers模式发送的消息'),{headers: {data:'xmzs'}})//断开await channel.close()await connection.close()process.exit(0)

消费者(bindQueue 增加一个对象 属性跟生产者对应即可)

   import amqplib from 'amqplib'const connection = await amqplib.connect('amqp://localhost:5672')const channel = await connection.createChannel() //创建一个频道await channel.assertExchange('headers', 'headers')//添加一个队列const { queue } = await channel.assertQueue('queue1')//绑定交换机/*** @param {String} queue 队列名称* @param {String} exchange 交换机名称* @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词*/await channel.bindQueue(queue, 'headers', '',{data:'xmzs' //注意这儿不加headers 直接放值即可})//接收消息channel.consume(queue, (msg) => {console.log(msg.content.toString());}, {noAck: true //自动确认消息被消费})
4. Fanout模式

生产者(其实也就是routingKey 变成一个空值实现全体广播)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel()
//声明一个交换机
/**
* @param {String} exchange 交换机的名称
* @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式
* @param {Object} options {durable: true} //开启消息持久化
*/
await channel.assertExchange('fanout', 'fanout')
//发送消息
/**
* @param {String} exchange 交换机的名称
* @param {String} routingKey 路由键
* @param {Buffer} content 消息内容
*/
channel.publish('fanout', '', Buffer.from('小满fanout模式发送的消息'))//断开
await channel.close()
await connection.close()
process.exit(0)

消费者(routingKey接受空值即可 就算有值也会被忽略)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道
await channel.assertExchange('fanout', 'fanout')//添加一个队列
const { queue } = await channel.assertQueue('queue1')
//绑定交换机
/**
* @param {String} queue 队列名称
* @param {String} exchange 交换机名称
* @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词
*/
await channel.bindQueue(queue, 'fanout', '')
//接收消息
channel.consume(queue, (msg) => {
console.log(msg.content.toString());
}, {
noAck: true //自动确认消息被消费
})

总结

通过使用RabbitMQ作为缓冲,避免数据库服务崩溃的风险。生产者将消息放入队列,消费者从队列中读取消息并进行处理,随后确认消息已被处理。在应用之间存在一对多的关系时,可以使用Exchange交换机根据不同的规则将消息转发到相应的队列:

  1. 直连交换机(direct exchange):根据消息的路由键(routing key)将消息直接转发到特定队列。
  2. 主题交换机(topic exchange):根据消息的路由键进行模糊匹配,将消息转发到符合条件的队列。
  3. 头部交换机(headers exchange):根据消息的头部信息进行转发。
  4. 广播交换机(fanout exchange):将消息广播到交换机下的所有队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/26152.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3 数据类型、运算符与表达式-3.5 字符型数据-3.5.2 转义字符

3.5.2 转义字符 #include <stdio.h>main() {int a, b, c;a 5;b 6;c 7;printf("ab c\tde\rf\n");printf("hijk\tL\bM\n");return 0; }

花键轴类零件加工方法有哪些?

花键轴零件的加工方法 一辆普通中型卡车上约含 30 个花键轴零件, 通常用在离合器、变速器、传动轴总成、差速器、转向总成等位置。 花键轴零件的加工工艺是传统的切削加工和塑形成形加工两种。传统工艺如下的8个主要工序&#xff1a; 下料→锻造毛坯→毛坯加工→外花键加工…

【C语言】青蛙跳台阶问题 - 递归算法(一种思路,针对三种不同的情况)

文章目录 1. 前言2. 题目和分析2.1 代码实现2.2 反思 (重点) 3.题目二&#xff08;变式&#xff09;3.1 分析3.2 代码实现 4. 题目三&#xff08;变式&#xff09;4.1 分析4.2 代码实现 1. 前言 相信大家看到青蛙跳台阶问题时&#xff0c;第一时间就会想到递归。那你知道为什么…

【免杀】C2远控-Loader加载器-动态API调用

目录 创建后门程序站在杀毒程序立场上对后门进行分析例&#xff1a;动态调用VirtualProtect函数 作用:绕过杀毒对导入表的检测定性 创建后门程序 VS新建项目 回调函数加载Loader #include <Windows.h>unsigned char shellcode[] "";void CallBack() {void* p…

FlashBrowser

本例&#xff1a;windows10 下载FlashBrowser 解决flash失效问题&#xff0c;更换浏览器 https://www.flash.cn/ 下载FlashBrowser浏览器

Redis的缓存击穿、缓存穿透和缓存雪崩是什么?怎么预防?

Redis的缓存击穿、缓存穿透和缓存雪崩是什么&#xff1f;怎么预防&#xff1f; 前言缓存击穿定义解决思路实现加锁设置过期时间Lua脚本刷新锁 缓存穿透定义实现 缓存雪崩定义解决思路 总结 前言 最近在CSDN上看到了一篇博客&#xff0c;Redis缓存击穿、雪崩、穿透&#xff01;…

深入理解Vue3.js响应式系统基础逻辑

如果您觉得这篇文章有帮助的话&#xff01;给个点赞和评论支持下吧&#xff0c;感谢~ 作者&#xff1a;前端小王hs 阿里云社区博客专家/清华大学出版社签约作者/csdn百万访问前端博主/B站千粉前端up主 此篇文章是博主于2022年学习《Vue.js设计与实现》时的笔记整理而来 书籍&a…

C++:SLT容器-->stack

C:SLT容器--&#xff1e;stack 1. stack容器2. stack 常用接口 1. stack容器 先进后出&#xff0c;后进先出不允许有遍历行为可以判断容器是否为空可以返回元素的个数 2. stack 常用接口 构造函数 stack<T> stk; // stack采用模板类实现&#xff0c;stack对象的默认构造形…

第1期JAVA社招面试经验月报

面经哥专注互联网社招面试经验分享&#xff0c;关注我&#xff0c;每日推送精选面经&#xff0c;面试前&#xff0c;先找面经哥&#xff5c;面经哥整理了上月30篇面试经历&#xff0c;选取了较为热点高频的面试题供大家参考 基础知识类‍‍‍‍‍ 1、说下双亲委派原则以及类加…

1992-2012年美国西海岸的海面高度异常数据集

Gridded Altimeter Fields with Enhanced Coastal Coverage 具有增强海岸覆盖范围的网格化测高场 简介 具有增强的海岸覆盖范围的网格化高度计场数据产品包含美国西海岸的海面高度异常&#xff08;SSHA 或 SLA&#xff09;以及北纬 35.25 度-48.5 度和东经 227.75 度-248.5 …

Python酷库之旅-开启库房之门

目录 一、库的定义 二、库的组成 三、库的分类 四、如何学好Python库&#xff1f; 五、注意事项 六、推荐阅读 1、Python筑基之旅 2、Python函数之旅 3、Python算法之旅 4、Python魔法之旅 5、 博客个人主页 一、库的定义 在Python中&#xff0c;库(Library)是一个封…

探索智慧机场运营中心解决方案的价值与应用

随着全球航空业的不断发展&#xff0c;机场运营中心的作用日益凸显。智慧机场运营中心解决方案以其高效的管理和智能化的运营模式&#xff0c;成为优化机场运营、提升服务水平的重要工具。本文将深入探讨智慧机场运营中心解决方案的价值与应用&#xff0c;揭示其在机场管理中的…

机器学习常见知识点 2:决策树

文章目录 决策树算法1、决策树树状图2、选择最优决策条件3、决策树算法过程→白话决策树原理决策树构建的基本步骤常见的决策树算法决策树的优缺点 【五分钟机器学习】可视化的决策过程&#xff1a;决策树 Decision Tree 关键词记忆&#xff1a; 纯度、选择最优特征分裂、熵、基…

电脑上的瑞士军刀

一、简介 1、一款专为 Windows 操作系统设计的桌面管理工具&#xff0c;它具备保存和恢复桌面图标位置的功能&#xff0c;使用户能够在各种情况下&#xff0c;如分辨率变动、系统更新或其他原因导致的图标位置混乱后&#xff0c;快速恢复到熟悉的工作环境。它还拥有诸多实用功能…

【Pyqt6 学习笔记】实现串口调试助手,并将接收到数据模拟键盘输出

文章目录 代码示例main.pyscreen_shot_module.pyqrcmd.pyuntitled.pyuntitled.ui 本文内容是 【Pyqt6 学习笔记】DIY一个二维码解析生成小工具的延申&#xff0c;在原来的基础上实现了串口调试助手功能&#xff0c;并利用 pywinauto的 keyboard模块将接收到数据模拟键盘输出…

【递归+二叉树思想+搜索】 Alice and the Cake题解

Alice and the Cake题解 AC记录&#xff1a;记录-洛谷 题面翻译&#xff08;大概就是题目大意&#xff09; 执行恰好 n − 1 n-1 n−1 次操作&#xff0c;每次操作可以选择当前所有蛋糕中满足其重量 w ⩾ 2 w\geqslant 2 w⩾2 的一块&#xff0c;然后将其分为质量分别为 …

手机连接ESP8266的WIFI,进入内置网页,输入要显示的内容,在OLED显示屏上显示文本

连线 OLEDESP8266含义GNDGND地线VCC3V电源SCLD1时钟线SDAD2通信数据线 只支持英文信息的显示和数字。 #include <ESP8266WiFi.h> #include <ESP8266WebServer.h> #include <Wire.h> #include <Adafruit_GFX.h> #include <Adafruit_SSD1306.h>#d…

5.大模型高效微调(PEFT)未来发展趋势

PEFT 主流技术分类 UniPELT 探索PEFT 大模型的统一框架&#xff08;2022&#xff09; UIUC 和Meta AI 研究人员发表的UniPELT 提出将不同的PEFT 方法模块化。 通过门控机制学习激活最适合当前数据或任务的方法&#xff0c;尤其是最常见的3大类PEFT 技术&#xff1a; Adapters…

事业单位——被逆袭篇

目录 一、结果 二、考试 三、时间 四、复习 五、总结 一、结果 图1&#xff1a;2024年浙江广播电视集团下属浙江省中波发射管理中心公开招聘笔面试结果 准考证号笔试面试总成绩排名备注107016070.866.48310702416555.44107134390.871.681入围107146869.869.08210715406454.…

征信受损,别再犯傻!

听说你的征信出了点小问题&#xff1f;别急&#xff0c;这事儿说大不大&#xff0c;但也不能掉以轻心。征信&#xff0c;说白了就是你借钱还钱的记录本&#xff0c;一旦它“花”了&#xff0c;借钱可就没那么轻松了。 先来说说这征信“花”了是咋回事 征信“花”了&#xff0c…