Nodejs 第七十六章(MQ进阶)

MQ介绍和基本使用在上一章介绍过了,不再重复

  1. 消息:在RabbitMQ中,消息是传递的基本单元。它由消息体和可选的属性组成

  2. 生产者Producer:生产者是消息的发送方,它将消息发送到RabbitMQ的交换器(Exchange)中

  3. 交换器Exchange:交换器接收从生产者发送的消息,并根据特定的规则将消息路由到一个或多个队列中

  4. 队列Queue:队列是消息的接收方,它存储了待处理的消息。消费者可以从队列中获取消息并进行处理

  5. 消费者Consumer:消费者是消息的接收方,它从队列中获取消息并进行处理

MQ进阶用法

发布订阅

发布订阅,消息的发送者称为发布者(Publisher),而接收消息的一个或多个实体称为订阅者(Subscriber

回顾上一篇,点对点通讯生产者发送一条消息通过路由投递到Queue,只有一个消费者能消费到 也就是一对一发送

请添加图片描述

回归主题 发布订阅就是生产者的消息通过交换机写到多个队列,不同的订阅者消费不同的队列,也就是实现了一对多

发布订阅的模式分为四种

  1. Direct(直连)模式:把消息放到交换机指定key的队列里面。
  2. Topic(主题)模式: 把消息放到交换机指定key的队列里面,额外增加使用"*“匹配一个单词或使用”#"匹配多个单词
  3. Headers(头部)模式:把消息放到交换机头部属性去匹配队列
  4. Fanout(广播)模式:把消息放入交换机所有的队列,实现广播

发布订阅-代码编写

1. direct模式编写

主要就是通过 routingKey 匹配实现路由 这里的zs就是routingKey

生产者

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() 
//声明一个交换机
/*** @param {String} exchange 交换机的名称* @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式* @param {Object} options {durable: true} //开启消息持久化*/
await channel.assertExchange('logs', 'direct', {durable: true
})
//发送消息
/*** @param {String} exchange 交换机的名称* @param {String} routingKey 路由键* @param {Buffer} content 消息内容*///这里的zs就是routingKey
channel.publish('logs', 'zs', Buffer.from('小满direct模式发送的消息'))//断开
await channel.close()
await connection.close()
process.exit(0)

消费者(我们编写多个方便测试)

consume.js

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道await channel.assertExchange('logs', 'direct', {durable: true
})//添加一个队列
const { queue } = await channel.assertQueue('queue1', {durable: true
})
//绑定交换机
/*** @param {String} queue 队列名称* @param {String} exchange 交换机名称* @param {String} routingKey 路由键*/
//匹配对应的zs值才能收到
await channel.bindQueue(queue, 'logs', 'zs')
//接收消息
channel.consume('queue1', (msg) => {console.log(msg.content.toString());
}, {noAck: true //自动确认消息被消费
})

consume2.js

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道await channel.assertExchange('logs', 'direct', {durable: true
})//添加一个队列
const { queue } = await channel.assertQueue('queue2', {durable: true
})
//绑定交换机
/*** @param {String} queue 队列名称* @param {String} exchange 交换机名称* @param {String} routingKey 路由键*///匹配对应的zs值才能收到
await channel.bindQueue(queue, 'logs', 'zs')
//接收消息
channel.consume('queue2', (msg) => {console.log(msg.content.toString());
}, {noAck: true //自动确认消息被消费
})
2. Topic模式编写

我们把模式切换成了Topic 并且publish 发布的时候 routingKey 换成了 xm.xxxxxxxx

生产者

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() 
//声明一个交换机
/*** @param {String} exchange 交换机的名称* @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式* @param {Object} options {durable: true} //开启消息持久化*/
await channel.assertExchange('topic', 'topic', {durable: true
})
//发送消息
/*** @param {String} exchange 交换机的名称* @param {String} routingKey 路由键* @param {Buffer} content 消息内容*///注意这儿匹配规则换了 换成xm.xxxxxxxxxxxxxxxxxxxxx
channel.publish('logs', 'xm.sadsdsdasdasdasdsda', Buffer.from('小满topic模式发送的消息'))//断开
await channel.close()
await connection.close()
process.exit(0)

消费者匹配(注意这里匹配规则xm.*'使用了* 就是模糊匹配的意思)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道await channel.assertExchange('topic', 'topic', {durable: true
})//添加一个队列
const { queue } = await channel.assertQueue('queue1', {durable: true
})
//绑定交换机
/*** @param {String} queue 队列名称* @param {String} exchange 交换机名称* @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词*///这儿变化了
await channel.bindQueue(queue, 'topic', 'xm.*')
//接收消息
channel.consume('queue1', (msg) => {console.log(msg.content.toString());
}, {noAck: true //自动确认消息被消费
})
3. Headers模式

生产者(注意 publish 增加第四个参数开启了header 添加了data参数)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() //声明一个交换机/*** @param {String} exchange 交换机的名称* @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式* @param {Object} options {durable: true} //开启消息持久化*/await channel.assertExchange('headers', 'headers', {durable: true})//发送消息/*** @param {String} exchange 交换机的名称* @param {String} routingKey 路由键* @param {Buffer} content 消息内容* @param {Object} options {headers: {'key': 'value'}} //定义匹配规则*///嘿 这儿变了channel.publish('headers', '', Buffer.from('小满headers模式发送的消息'),{headers: {data:'xmzs'}})//断开await channel.close()await connection.close()process.exit(0)

消费者(bindQueue 增加一个对象 属性跟生产者对应即可)

   import amqplib from 'amqplib'const connection = await amqplib.connect('amqp://localhost:5672')const channel = await connection.createChannel() //创建一个频道await channel.assertExchange('headers', 'headers')//添加一个队列const { queue } = await channel.assertQueue('queue1')//绑定交换机/*** @param {String} queue 队列名称* @param {String} exchange 交换机名称* @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词*/await channel.bindQueue(queue, 'headers', '',{data:'xmzs' //注意这儿不加headers 直接放值即可})//接收消息channel.consume(queue, (msg) => {console.log(msg.content.toString());}, {noAck: true //自动确认消息被消费})
4. Fanout模式

生产者(其实也就是routingKey 变成一个空值实现全体广播)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel()
//声明一个交换机
/**
* @param {String} exchange 交换机的名称
* @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式
* @param {Object} options {durable: true} //开启消息持久化
*/
await channel.assertExchange('fanout', 'fanout')
//发送消息
/**
* @param {String} exchange 交换机的名称
* @param {String} routingKey 路由键
* @param {Buffer} content 消息内容
*/
channel.publish('fanout', '', Buffer.from('小满fanout模式发送的消息'))//断开
await channel.close()
await connection.close()
process.exit(0)

消费者(routingKey接受空值即可 就算有值也会被忽略)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道
await channel.assertExchange('fanout', 'fanout')//添加一个队列
const { queue } = await channel.assertQueue('queue1')
//绑定交换机
/**
* @param {String} queue 队列名称
* @param {String} exchange 交换机名称
* @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词
*/
await channel.bindQueue(queue, 'fanout', '')
//接收消息
channel.consume(queue, (msg) => {
console.log(msg.content.toString());
}, {
noAck: true //自动确认消息被消费
})

总结

通过使用RabbitMQ作为缓冲,避免数据库服务崩溃的风险。生产者将消息放入队列,消费者从队列中读取消息并进行处理,随后确认消息已被处理。在应用之间存在一对多的关系时,可以使用Exchange交换机根据不同的规则将消息转发到相应的队列:

  1. 直连交换机(direct exchange):根据消息的路由键(routing key)将消息直接转发到特定队列。
  2. 主题交换机(topic exchange):根据消息的路由键进行模糊匹配,将消息转发到符合条件的队列。
  3. 头部交换机(headers exchange):根据消息的头部信息进行转发。
  4. 广播交换机(fanout exchange):将消息广播到交换机下的所有队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/26152.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

10款堪称神器的宝藏软件,相见恨晚

今天给大家带来10款堪称神器的宝藏软件,每一个都非常好用,让你直呼相见恨晚。 1、知犀思维导图 知犀思维导图是大家组织信息、梳理思维的重要利器,它可以帮助我们以图形化的方式呈现思维过程,让整体思路变得清晰直观。通过使用知…

【神经网络】资源

神经网络是一种模拟人脑神经元结构的计算模型,广泛应用于机器学习和深度学习领域。以下是一些关于神经网络的资源,可以帮助你从基础到高级逐步掌握相关知识: 入门教程: 《神经网络教程:从原理到实践的全面解析》提供…

3 数据类型、运算符与表达式-3.5 字符型数据-3.5.2 转义字符

3.5.2 转义字符 #include <stdio.h>main() {int a, b, c;a 5;b 6;c 7;printf("ab c\tde\rf\n");printf("hijk\tL\bM\n");return 0; }

架构师如何运用情景领导力

架构师应用情景领导力意味着根据团队成员的成熟度和项目的具体情况来调整自己的领导风格。以下是架构师如何使用情景领导力的一些方法&#xff1a; 评估团队成员的成熟度&#xff1a;架构师需要评估团队成员在技术和专业领域的成熟度&#xff0c;包括他们的专业技能、经验、解…

花键轴类零件加工方法有哪些?

花键轴零件的加工方法 一辆普通中型卡车上约含 30 个花键轴零件, 通常用在离合器、变速器、传动轴总成、差速器、转向总成等位置。 花键轴零件的加工工艺是传统的切削加工和塑形成形加工两种。传统工艺如下的8个主要工序&#xff1a; 下料→锻造毛坯→毛坯加工→外花键加工…

34、掌握线上系统:jmap和jhat带你深入了解对象分布

34.1、前文总结 在上一篇文章中,我们向大家介绍了一个在日常工作中的实用工具jstat。通过使用jstat,我们可以非常轻松便捷地了解线上系统的运行状况,包括新对象增速、Young GC触发频率及耗时,以及对象进入老年代的增速和Full GC触发频率及耗时。这些信息有助于我们全面掌握…

【C语言】青蛙跳台阶问题 - 递归算法(一种思路,针对三种不同的情况)

文章目录 1. 前言2. 题目和分析2.1 代码实现2.2 反思 (重点) 3.题目二&#xff08;变式&#xff09;3.1 分析3.2 代码实现 4. 题目三&#xff08;变式&#xff09;4.1 分析4.2 代码实现 1. 前言 相信大家看到青蛙跳台阶问题时&#xff0c;第一时间就会想到递归。那你知道为什么…

SwiftUI中结合使用Timer和onReceive

SwiftUI提供了一种现代化的方式来构建用户界面&#xff0c;其中包括对时间驱动的事件的响应。在许多应用中&#xff0c;可能需要根据时间间隔执行某些操作&#xff0c;例如更新界面或触发事件。SwiftUI通过结合使用Timer和 onReceive 来实现这一功能&#xff0c;使得处理时间相…

【免杀】C2远控-Loader加载器-动态API调用

目录 创建后门程序站在杀毒程序立场上对后门进行分析例&#xff1a;动态调用VirtualProtect函数 作用:绕过杀毒对导入表的检测定性 创建后门程序 VS新建项目 回调函数加载Loader #include <Windows.h>unsigned char shellcode[] "";void CallBack() {void* p…

FlashBrowser

本例&#xff1a;windows10 下载FlashBrowser 解决flash失效问题&#xff0c;更换浏览器 https://www.flash.cn/ 下载FlashBrowser浏览器

Redis的缓存击穿、缓存穿透和缓存雪崩是什么?怎么预防?

Redis的缓存击穿、缓存穿透和缓存雪崩是什么&#xff1f;怎么预防&#xff1f; 前言缓存击穿定义解决思路实现加锁设置过期时间Lua脚本刷新锁 缓存穿透定义实现 缓存雪崩定义解决思路 总结 前言 最近在CSDN上看到了一篇博客&#xff0c;Redis缓存击穿、雪崩、穿透&#xff01;…

深入理解Vue3.js响应式系统基础逻辑

如果您觉得这篇文章有帮助的话&#xff01;给个点赞和评论支持下吧&#xff0c;感谢~ 作者&#xff1a;前端小王hs 阿里云社区博客专家/清华大学出版社签约作者/csdn百万访问前端博主/B站千粉前端up主 此篇文章是博主于2022年学习《Vue.js设计与实现》时的笔记整理而来 书籍&a…

15-字符串处理的常用函数——查找字符串,求字符串长度,分割字符串,查找指定字符,比较字符串,连接字符串

15-字符串处理的常用函数——查找字符串&#xff0c;求字符串长度&#xff0c;分割字符串&#xff0c;查找指定字符&#xff0c;比较字符串&#xff0c;连接字符串 文章目录 15-字符串处理的常用函数——查找字符串&#xff0c;求字符串长度&#xff0c;分割字符串&#xff0c;…

C++:SLT容器-->stack

C:SLT容器--&#xff1e;stack 1. stack容器2. stack 常用接口 1. stack容器 先进后出&#xff0c;后进先出不允许有遍历行为可以判断容器是否为空可以返回元素的个数 2. stack 常用接口 构造函数 stack<T> stk; // stack采用模板类实现&#xff0c;stack对象的默认构造形…

第1期JAVA社招面试经验月报

面经哥专注互联网社招面试经验分享&#xff0c;关注我&#xff0c;每日推送精选面经&#xff0c;面试前&#xff0c;先找面经哥&#xff5c;面经哥整理了上月30篇面试经历&#xff0c;选取了较为热点高频的面试题供大家参考 基础知识类‍‍‍‍‍ 1、说下双亲委派原则以及类加…

1992-2012年美国西海岸的海面高度异常数据集

Gridded Altimeter Fields with Enhanced Coastal Coverage 具有增强海岸覆盖范围的网格化测高场 简介 具有增强的海岸覆盖范围的网格化高度计场数据产品包含美国西海岸的海面高度异常&#xff08;SSHA 或 SLA&#xff09;以及北纬 35.25 度-48.5 度和东经 227.75 度-248.5 …

Vue3 渲染函数 API(五)

h() h 函数用于创建并返回一个虚拟节点&#xff08;VNode&#xff09; h( tag, // HTML 标签名、组件对象或异步组件函数 data, // 一个包含组件的props/attrs/domProps/on 等的对象 children // 子虚拟节点 (VNodes)&#xff0c;由 h()构建而成&#xff0c; // 也可以使用字符…

解析final原理

原理 public class TestFinal {final int a 20; //final保证不能读到两个值} 字节码&#xff1a; 写屏障&#xff1a; 之前的指令不会重排序到后面去&#xff08;有序性&#xff09; 之前的修改、赋值操作之后会同步到主内存中去&#xff08;可见性&#xff09; 0: aload…

大学生如何学习JavaScript?

学习 JavaScript 对于大学生来说是一个宝贵的技能&#xff0c;因为它是现代网页开发的核心语言之一。以下是一些详细的步骤&#xff0c;帮助大学生学习 JavaScript&#xff1a; 1. 了解 Web 开发基础 学习 HTML 和 CSS&#xff0c;这是网页开发的基石。 2. 学习 JavaScript …

什么是transformer?

整个 transformer 的重点在 QKV 结构上。 以前的 CNN 试图通过卷积来表达不同位置数值之间的关系&#xff0c;学习卷积值也就是学习矩阵里的数值之间的特征&#xff0c;所以适合用在图像里面。因为图像就是一个个的像素点形成的矩阵。 RNN 试图通过加入反馈机制来理解一串数值…