//produceMessage.ts 模拟生产者
import Redis from 'ioredis';
const redis = new Redis();
// 生产者:将消息推送到队列
async function produceMessage(queueName:string, message:string) {try {await redis.rpush(queueName, message);console.log(`Produced message: ${message}`);} catch (error) {console.error('Error producing message:', error);}
}
// 例子
const queueName = 'myQueue';
for (let i = 0; i < 100; i++) {// 生产者推送消息到队列produceMessage(queueName, 'Hello, World!' +i);
}
// consumeMessages.ts 模拟消费者
import Redis from 'ioredis';
const redis = new Redis();// 消费者:从队列中获取消息并处理
async function consumeMessages(queueName:string) {while (true) {try {// 从队列左侧弹出一条消息const message = await redis.blpop(queueName, 0);// 0 表示无超时,一直阻塞直到有消息可用/10 表示在10秒内阻塞等待消息。if (message) {// 处理消息的逻辑console.log(`Consumed message: ${message}`);}console.log("处理完成...");} catch (error) {console.error('Error consuming message:', error);}}
}// 例子
const queueName = 'myQueue';// 消费者处理队列中的消息
consumeMessages(queueName);
Redis 多个消费者发布订阅模式
import Redis from 'ioredis';// 创建 Redis 客户端
const redisClient = new Redis();// 发布消息到频道
async function publishMessage(channel: string, message: string) {try {// 发布消息到指定频道await redisClient.publish(channel, message);console.log(`Message "${message}" published to channel "${channel}"`);} catch (error) {console.error('Error publishing message:', error);}
}// 例子
const channelName = 'myChannel';
const messageContent = 'Hello, Redis!';// 发布消息到频道
publishMessage(channelName, messageContent);
import Redis from 'ioredis';
const redis = new Redis();// 订阅频道的函数
async function subscribeChannel(channelName:any) {try {console.log(`Subscribing to channel ${channelName}...`);// 创建 Redis 客户端并订阅指定频道redis.subscribe(channelName);// 监听消息redis.on('message', (channel, message) => {console.log(`Received message on channel ${channel}: ${message}`);});} catch (error) {console.error('Error subscribing to channel:', error);}
}// 例子
const channelName = 'myChannel';// 创建多个订阅者
subscribeChannel(channelName);
Redis pub/sub 缺点
解决方式: rabbitMQ,…等专业消息队列中间键