用Redis实现消息队列并搭建分布式邮件消息系统
- 系统介绍
- Redis实现消息队列
- 思路分析
- 代码实现
- MongoDB监听数据变化
- 思路分析
- 代码实现
- Mongoose测试连接
- 监听mongodb数据变化
- 注意点
系统介绍
本次要实现的是一个能够实现实时监控Mongodb中数据变化的系统,要能够在数据发生变动的时候实时将变动消息发送给指定的邮箱。
- Node.js:用于开发的语言,既能用于前端开发,又能用来做后端开发。
- Redis:用于搭建消息队列,实现消息的分布式。
- MongoDB:持久化数据,同时实现触发条件的监听,当MongoDB中有新增数据的时候发送新增数据的邮件消息。
Redis实现消息队列
思路分析
主要使用的就是Redis-smq这个库,下面展示的就是主要使用的消息队列类,其中包括了很多队列种类,有先进先出、优先级先出等方式。
整个库的原理如下结构图,本次使用到的只有主线,就是发送和接收:
代码实现
const { transemail } = require('../email_list/email.js');
const redis = require('promise-redis-client');
const redisHost = 'localhost';
const redisPort = 6379;// 配置 Redis 客户端
const createRedisClient = () => {return new Promise((resolve, reject) => {let client = redis.createClient({ host: redisHost, port: redisPort });client.on('error', err => {console.log('Redis 连接出错');reject(err);});client.on('ready', () => {console.log('Redis ready');resolve(client);});});
};async function startWaitMsg(redisClient) {while (true) {let res = null;try {res = await redisClient.brpop('bookChanges', 0);console.log('收到消息', res);} catch (err) {console.log('brpop 出错,重新 brpop');continue;}res = res.toString();transemail(res);}
}async function listenredis() {try {// 启动生产者// startProducer();// 创建 Redis 客户端const redisClient = await createRedisClient();// 启动消息监听startWaitMsg(redisClient);} catch (error) {console.error('Error:', error);}
}
//测试的时候使用的代码
listenredis().catch(console.error);// 处理退出信号以关闭客户端
process.on('SIGINT', async () => {console.log('Closing clients...');process.exit(0);
});
MongoDB监听数据变化
思路分析
由于要实现实时检测,经过分析以后使用mongoose中的数据流监控最为合适,但是要实现这个方法需要用到watch方法,这个方法只有在mongodb有副本集的时候才能使用,因此还需要提前配置好mongodb才能进行这里下一步的操作,如果没有配置过mongodb的副本集的可以参考我的这篇博客。
- 用mongoose中的watch连接mongodb副本集数据库获取数据变化。
- 将数据变化发送到redis消息队列中。
首先在命令行中将服务启动:
代码实现
Mongoose测试连接
const mongoose = require('mongoose');mongoose.connect('mongodb://localhost/test', {useNewUrlParser: true,useUnifiedTopology: true
}).then(() => {console.log('Successfully connected to MongoDB');const bookSchema = new mongoose.Schema({title: String,author: String});const Book = mongoose.model('Book', bookSchema);const bookChangeStream = Book.watch();bookChangeStream.on('change', (change) => {console.log('Collection changed:', change);if (change.operationType === 'insert') {console.log('New book added:', change.fullDocument);}});
}).catch((error) => {console.log('Error connecting to MongoDB:', error);
});
测试结果:
在Mongo Campass中添加数据以后,在终端中出现如下消息:
证明测试成功,可以进行下一步操作啦!
监听mongodb数据变化
const redis = require('redis');
const mongoose = require('mongoose');
// 创建 Redis 客户端
const redisClient = redis.createClient({host: 'localhost',port: 6379});// 连接到 Redis
redisClient.connect();//连接mongodb数据库并检测变化发送到redis消息队列
async function connectAndMonitorMongoDB(redisClient) {try {await mongoose.connect('mongodb://localhost/test', {useNewUrlParser: true,useUnifiedTopology: true});console.log('Successfully connected to MongoDB');const bookSchema = new mongoose.Schema({title: String,author: String});const Book = mongoose.model('Book', bookSchema);const bookChangeStream = Book.watch();try{bookChangeStream.on('change', (change) => {console.log('Collection changed:', change);console.log("type of change:",typeof(change));msg = JSON.stringify(change.fullDocument);msg = msg.replace(/{|}/g, '');msg = "New message received:"+msg;console.log("massage:",msg);console.log("type of message:",typeof(msg));if (change.operationType === 'insert') {console.log('New book added:', msg);redisClient.lPush('bookChanges', msg, function(err, reply) {if (err) {console.log('Error storing JSON to Redis:', err);} else {console.log('JSON stored successfully, list length:', reply);}})}});}catch (err){console.log("error while loading data into redis:", err)}} catch (error) {console.log('Error connecting to MongoDB:', error);}
}// module.exports = { connectAndMonitorMongoDB };
async function main() {try {await connectAndMonitorMongoDB(redisClient);console.log('Monitoring MongoDB changes...');} catch (error) {console.error('Failed to start monitoring:', error);}
}main();
注意点
在nodejs中将JSON对象转换成字符串的JSON.Stringify函数并不是严格的转换成字符串而是带有一个大括号,然而这个在进行redis进队列的时候会有问题,因此需要用正则表达式去掉大括号:
msg = JSON.stringify(change.fullDocument);
msg = msg.replace(/{|}/g, '');
msg = "New message received:"+msg;