【后端开发实习】用MongoDB和Redis实现消息队列搭建分布式邮件消息系统

用Redis实现消息队列并搭建分布式邮件消息系统

  • 系统介绍
  • Redis实现消息队列
    • 思路分析
    • 代码实现
  • MongoDB监听数据变化
    • 思路分析
    • 代码实现
      • Mongoose测试连接
      • 监听mongodb数据变化
  • 注意点

系统介绍

本次要实现的是一个能够实现实时监控Mongodb中数据变化的系统,要能够在数据发生变动的时候实时将变动消息发送给指定的邮箱。

  • Node.js:用于开发的语言,既能用于前端开发,又能用来做后端开发。
  • Redis:用于搭建消息队列,实现消息的分布式。
  • MongoDB:持久化数据,同时实现触发条件的监听,当MongoDB中有新增数据的时候发送新增数据的邮件消息。

Redis实现消息队列

思路分析

主要使用的就是Redis-smq这个库,下面展示的就是主要使用的消息队列类,其中包括了很多队列种类,有先进先出、优先级先出等方式。
在这里插入图片描述
整个库的原理如下结构图,本次使用到的只有主线,就是发送和接收:
在这里插入图片描述

代码实现

const { transemail } = require('../email_list/email.js');
const redis = require('promise-redis-client');
const redisHost = 'localhost';
const redisPort = 6379;// 配置 Redis 客户端
const createRedisClient = () => {return new Promise((resolve, reject) => {let client = redis.createClient({ host: redisHost, port: redisPort });client.on('error', err => {console.log('Redis 连接出错');reject(err);});client.on('ready', () => {console.log('Redis ready');resolve(client);});});
};async function startWaitMsg(redisClient) {while (true) {let res = null;try {res = await redisClient.brpop('bookChanges', 0);console.log('收到消息', res);} catch (err) {console.log('brpop 出错,重新 brpop');continue;}res = res.toString();transemail(res);}
}async function listenredis() {try {// 启动生产者// startProducer();// 创建 Redis 客户端const redisClient = await createRedisClient();// 启动消息监听startWaitMsg(redisClient);} catch (error) {console.error('Error:', error);}
}
//测试的时候使用的代码
listenredis().catch(console.error);// 处理退出信号以关闭客户端
process.on('SIGINT', async () => {console.log('Closing clients...');process.exit(0);
});

MongoDB监听数据变化

思路分析

由于要实现实时检测,经过分析以后使用mongoose中的数据流监控最为合适,但是要实现这个方法需要用到watch方法,这个方法只有在mongodb有副本集的时候才能使用,因此还需要提前配置好mongodb才能进行这里下一步的操作,如果没有配置过mongodb的副本集的可以参考我的这篇博客。

  1. 用mongoose中的watch连接mongodb副本集数据库获取数据变化。
  2. 将数据变化发送到redis消息队列中。

首先在命令行中将服务启动:
在这里插入图片描述

代码实现

Mongoose测试连接

const mongoose = require('mongoose');mongoose.connect('mongodb://localhost/test', {useNewUrlParser: true,useUnifiedTopology: true
}).then(() => {console.log('Successfully connected to MongoDB');const bookSchema = new mongoose.Schema({title: String,author: String});const Book = mongoose.model('Book', bookSchema);const bookChangeStream = Book.watch();bookChangeStream.on('change', (change) => {console.log('Collection changed:', change);if (change.operationType === 'insert') {console.log('New book added:', change.fullDocument);}});
}).catch((error) => {console.log('Error connecting to MongoDB:', error);
});

在这里插入图片描述
测试结果:
在Mongo Campass中添加数据以后,在终端中出现如下消息:
在这里插入图片描述
证明测试成功,可以进行下一步操作啦!

监听mongodb数据变化

const redis = require('redis');
const mongoose = require('mongoose');
// 创建 Redis 客户端
const redisClient = redis.createClient({host: 'localhost',port: 6379});// 连接到 Redis
redisClient.connect();//连接mongodb数据库并检测变化发送到redis消息队列
async function connectAndMonitorMongoDB(redisClient) {try {await mongoose.connect('mongodb://localhost/test', {useNewUrlParser: true,useUnifiedTopology: true});console.log('Successfully connected to MongoDB');const bookSchema = new mongoose.Schema({title: String,author: String});const Book = mongoose.model('Book', bookSchema);const bookChangeStream = Book.watch();try{bookChangeStream.on('change', (change) => {console.log('Collection changed:', change);console.log("type of change:",typeof(change));msg = JSON.stringify(change.fullDocument);msg = msg.replace(/{|}/g, '');msg = "New message received:"+msg;console.log("massage:",msg);console.log("type of message:",typeof(msg));if (change.operationType === 'insert') {console.log('New book added:', msg);redisClient.lPush('bookChanges', msg, function(err, reply) {if (err) {console.log('Error storing JSON to Redis:', err);} else {console.log('JSON stored successfully, list length:', reply);}})}});}catch (err){console.log("error while loading data into redis:", err)}} catch (error) {console.log('Error connecting to MongoDB:', error);}
}// module.exports = { connectAndMonitorMongoDB };
async function main() {try {await connectAndMonitorMongoDB(redisClient);console.log('Monitoring MongoDB changes...');} catch (error) {console.error('Failed to start monitoring:', error);}
}main();

注意点

在nodejs中将JSON对象转换成字符串的JSON.Stringify函数并不是严格的转换成字符串而是带有一个大括号,然而这个在进行redis进队列的时候会有问题,因此需要用正则表达式去掉大括号:

msg = JSON.stringify(change.fullDocument);
msg = msg.replace(/{|}/g, '');
msg = "New message received:"+msg;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/47033.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

iterator(迭代器模式)

引入 在想显示数组当中所有元素时&#xff0c;我们往往会使用下面的for循环语句来遍历数组 #include <iostream> #include <vector>int main() {std::vector<int> v({ 1, 2, 3 });for (int i 0; i < v.size(); i){std::cout << v[i] << &q…

在 Windows 上运行 Linux:WSL2 完整指南(一)

系列文章目录 在 Windows 上运行 Linux&#xff1a;WSL2 完整指南&#xff08;一&#xff09;&#x1f6aa; 在 Windows 上运行 Linux&#xff1a;WSL2 完整指南&#xff08;二&#xff09; 文章目录 系列文章目录前言一、什么是 WSL&#xff1f;1.1 WSL 的主要特性1.2 WSL 的…

GitHub 令牌泄漏, Python 核心资源库面临潜在攻击

TheHackerNews网站消息&#xff0c;软件供应链安全公司 JFrog 的网络安全研究人员称&#xff0c;他们发现了一个意外泄露的 GitHub 令牌&#xff0c;可授予 Python 语言 GitHub 存储库、Python 软件包索引&#xff08;PyPI&#xff09;和 Python 软件基金会&#xff08;PSF&…

半自动辅助制作数据集【实例分割】

利用yoloV8的实例分割模型&#xff0c;半自动辅助制作数据集 引言&#xff1a;【主要步骤】 步骤1&#xff1a;无人机航拍&#xff0c;收集基础图片 步骤2&#xff1a;将收集到的图片&#xff0c;全部用yoloV8-seg.pt模型进行实例分割【预测之前&#xff0c;将配置文件default.…

使用llama.cpp量化模型

文章目录 概要整体实验流程技术细节小结 概要 大模型量化是指在保持模型性能尽可能不变的情况下&#xff0c;通过减少模型参数的位数来降低模型的计算和存储成本。本次实验环境为魔搭社区提供的免费GPU环境&#xff08;24G&#xff09;&#xff0c;使用Llama.cpp进行4bit量化可…

Python面试宝典第14题:背包问题

题目 现有编号从 0 到 n - 1 的 n 个背包&#xff0c;给你两个下标从 0 开始的整数数组 capacity 和 rocks 。第 i 个背包最大可以装 capacity[i] 块石头&#xff0c;当前已经装了 rocks[i] 块石头&#xff08;0 < rocks[i] < capacity[i]&#xff09;。另给你一个整数 a…

深度学习入门——神经网络的学习

前言 这里所说的“学习”是指从训练数据中自动获取最优权重参数的过程。 为了使神经网络能进行学习&#xff0c;将导入损失函数这一指标 为了找出尽可能小的损失函数的值&#xff0c;本章我们将介绍利用了函数斜率的梯度法 从数据中学习 本章将介绍神经网络的学习&#xff0c;…

ubuntu上模拟串口通信

前言 有时候写了一些串口相关的程序&#xff0c;需要调试的时候&#xff0c;又没有硬件&#xff0c;或者需要等其他模块完成才能一起联调。这样搭建环境费时费力&#xff0c;很多问题等到最后联调才发现就已经很晚了。 本文提供一种在ubuntu环境下模拟串口&#xff0c;直接就可…

【Web服务与Web应用开发】【C#】VS2019 创建ASP.NET Web应用程序,以使用WCF服务

目录 0.简介 1.环境 2.知识点 3.详细过程 1&#xff09;创建空项目 2&#xff09;添加Web表单 3&#xff09;使用Web表单的GUI设计 4&#xff09;添加服务引用 5&#xff09;在Web的button函数中调用服务&#xff0c;获取PI值 6&#xff09;测试 0.简介 本文属于一个…

Mysql的JSON格式字段实用操作函数JSON_CONTAINS、JSON_SEARCH、JSON_EXTRACT

文章目录 前言一、示例数据二、使用1.JSON_CONTAINS2.JSON_SEARCH3.JSON_EXTRACT 总结 前言 在开发中难免会遇见在Mysql字段存储JSON格式数据的业务情况&#xff0c;记录几种常用函数的 用法。 一、示例数据 建一张表&#xff0c;字段memo存储JSON格式数据 CREATE TABLE use…

摄像头 RN6752v1 视频采集卡

摄像头 AHD倒车摄像头比较好&#xff0c;AHD英文全名Analog High Definition&#xff0c;即模拟高清&#xff0c;拥有比较好的分辨率与画面质感。 RN6752v1 GQW AKKY2 usb 采集卡 FHD&#xff08;1080p&#xff09;、HD&#xff08;720p&#xff09;和D1&#xff08;480i&am…

MySQL第七次作业

Product表内容 字段名 字段描述 数据类型 主键 外键 非空 唯一 自增 Id 产品编号 Int(10) 是 否 是 是 否 Name 产品功能 Varchar(20) 否 否 是 否 否 Function 主要功能 Varchar(50) 否 否 否 否 否 Company 生产厂家 Varchar(20) 否 否 是 否 否 Address 家庭住址 Varchar(20…

支持大量边缘盒子集中管理调度的智慧物流开源了。

智慧物流视频监控平台是一款功能强大且简单易用的实时算法视频监控系统。它的愿景是最底层打通各大芯片厂商相互间的壁垒&#xff0c;省去繁琐重复的适配流程&#xff0c;实现芯片、算法、应用的全流程组合&#xff0c;从而大大减少企业级应用约95%的开发成本。用户只需在界面上…

AR0132AT 1/3 英寸 CMOS 数字图像传感器(AR0132AT6R、AR0132AT6C)适用于监控和高清视频等多种应用

AR0132AT 1/3 英寸 CMOS 数字图像传感器&#xff0c;带 1280H x 960V 有效像素阵列。它能在线性或高动态模式下捕捉图像&#xff0c;且带有卷帘快门读取。它包含了多种复杂的摄像功能&#xff0c;如自动曝光控制、开窗&#xff0c;以及视频和单帧模式。它适用于低光度和高动态范…

大模型学习笔记十一:视觉大模型

一、判别式模型和生成式模型 1&#xff09;判别式模型Discriminative ①给某一个样本&#xff0c;判断属于某个类别的概率&#xff0c;擅长分类任务&#xff0c;计算量少。&#xff08;学习策略函数Y f(X)或者条件概率P(YIX)&#xff09; ②不能反映训练数据本身的特性 ③学习…

SpringMVC 控制层框架-上

一、SpringMVC简介 1. 介绍 Spring Web MVC 是基于Servlet API构建的原始Web框架&#xff0c;从一开始就包含在Spring Framework 中。在控制层框架经历Srust、WebWork、Strust2等诸多产品的历代更迭之后&#xff0c;目前业界普遍选择了SpringMVC 作为Java EE项目表述层开发的首…

解读|http和https的区别,谁更好用

在日常我们浏览网页时&#xff0c;有些网站会看到www前面是http&#xff0c;有些是https&#xff0c;这两种有什么区别呢&#xff1f;为什么单单多了“s”&#xff0c;会有人说这个网页会更安全些&#xff1f; HTTP&#xff08;超文本传输协议&#xff09;和HTTPS&#xff08;…

[Labview] 表格单元格外边框 二维图片叠加绘图

最终效果如下所示 转行做Labview都没到三个月&#xff0c;主程居然让我做这么复杂的功能&#xff0c;真是看得起我/(ㄒoㄒ)/~~ 思路大致分为两步 1、确定每个框体的左上/右下单元格位置&#xff0c;转换为表格表格坐标并在二维图片上绘制生成&#xff1b; 2、为二维图片添加…

权威认可 | 海云安开发者安全助手系统通过信通院支撑产品功能认证并荣获信通院2024年数据安全体系建设优秀案例

近日&#xff0c;2024全球数字经济大会——数字安全生态建设专题论坛&#xff08;以下简称“论坛”&#xff09;在京成功举办。由全球数字经济大会组委会主办&#xff0c;中国信息通信研究院及公安部第三研究所共同承办&#xff0c;论坛邀请多位专家和企业共同参与。 会上颁发…

android预置apk

在framework开发中&#xff0c;有一些需求是需要预装应用的&#xff0c;有些是预置应用源码&#xff0c;有些是预置apk。今天我们就分享下怎样预置apk 一般系统有自定义的目录&#xff0c;比如我的项目中根目录下有一个文件夹vendor&#xff0c;这里没都是自定义的一些功能。预…