利用Redis的队列模式实现消息的发送和订阅,适合分布式场景,Java实现代码

在Redis中,通常使用发布/订阅模式(Pub/Sub)来进行消息的实时通信。然而,标准的Redis发布/订阅模式并不直接支持确保一条消息只被一台机器消费。在这种模式下,所有订阅了特定频道的客户端都会收到发布的消息。

 

但是,你可以通过一些策略或模式来模拟这种“只在一台机器上消费”的行为。以下是一些可能的方法:

 

1. 使用Redis的分布式锁

发布消息:当消息发布时,使用一个Redis的分布式锁(如RedLock)来确保只有一个消费者能够处理该消息。

处理消息:消费者尝试获取锁。如果成功,则处理消息并释放锁;如果失败,则放弃处理该消息。

2. 队列模式

发布消息:不是直接将消息发布到频道,而是将消息推送到一个Redis列表(List)或有序集合(Sorted Set)中。

消费消息:消费者使用BLPOP、BRPOP或其他阻塞操作从列表中拉取消息。由于这些操作是阻塞的,因此它们会等待直到有消息可用。同时,由于只有一个消费者能够成功地从列表中拉取消息,因此可以确保消息只被一台机器消费。

3. 分布式任务队列

使用更高级的分布式任务队列系统(如Celery、RabbitMQ、Kafka等),这些系统通常提供了更复杂的路由和消息确认机制,可以确保消息只被一台机器消费。

4. 自定义发布/订阅逻辑

在应用层实现自定义的发布/订阅逻辑。例如,你可以使用一个Redis哈希(Hash)来跟踪哪些消息已经被哪台机器消费。当发布消息时,检查哈希以确定是否有机器已经消费了该消息。如果没有,则选择一个消费者并将消息发送到其队列中。

注意事项:

在实现这些策略时,请确保考虑并发性和性能问题。例如,分布式锁和Redis列表操作在高并发环境下可能会成为瓶颈。

确保你的Redis实例是可靠和可扩展的。在分布式系统中,Redis实例可能会成为单点故障或性能瓶颈。考虑使用Redis集群或其他高可用性解决方案来避免这些问题。

根据你的具体需求和场景选择合适的策略。例如,如果你的系统需要处理大量实时消息并且需要确保消息的顺序性,那么使用Redis列表和阻塞操作可能是个好主意。但是,如果你的系统需要更复杂的路由和消息确认机制,那么使用分布式任务队列系统可能更合适。

 

下面介绍用队列模式,简单实现队列如下:

要使用Redis的队列模式实现消息的消费,我们可以使用Jedis库(Java的Redis客户端)来与Redis进行交互。以下是一个简单的Java实现,展示了如何使用Redis的List数据结构(通常使用LPUSH和BRPOP命令)来实现消息的发布和消费。

 

首先,确保你已经添加了Jedis的依赖到你的项目中。如果你使用Maven,可以在pom.xml中添加以下依赖:

 

xml

复制

<dependency>

    <groupId>redis.clients</groupId>

    <artifactId>jedis</artifactId>

    <version>3.7.0</version> <!-- 请检查并使用最新版本 -->

</dependency>

 

 

然后,你可以编写两个简单的Java类,一个用于发布消息(Producer),另一个用于消费消息(Consumer)。

 

Producer(消息发布者)

 

java

复制

import redis.clients.jedis.Jedis;

 

public class RedisMessageProducer {

 

    private final Jedis jedis;

    private final String queueName;

 

    public RedisMessageProducer(Jedis jedis, String queueName) {

        this.jedis = jedis;

        this.queueName = queueName;

    }

 

    public void produce(String message) {

        jedis.lpush(queueName, message);

        System.out.println("Produced: " + message);

    }

 

    public static void main(String[] args) {

        Jedis jedis = new Jedis("localhost"); // 假设Redis服务器运行在本地

        RedisMessageProducer producer = new RedisMessageProducer(jedis, "my-queue");

 

        // 发布消息

        producer.produce("Hello, Redis Queue!");

        producer.produce("Another message for the queue.");

 

        // 关闭连接(在实际应用中,你可能希望使用连接池来管理连接)

        jedis.close();

    }

}

 

 

Consumer(消息消费者)

 

java

复制

import redis.clients.jedis.Jedis;

 

public class RedisMessageConsumer implements Runnable {

 

    private final Jedis jedis;

    private final String queueName;

 

    public RedisMessageConsumer(Jedis jedis, String queueName) {

        this.jedis = jedis;

        this.queueName = queueName;

    }

 

    @Override

    public void run() {

        while (true) { // 无限循环,直到应用程序被终止

            String message = jedis.brpop(0, queueName).get(1); // 阻塞直到有消息可用

            if (message != null) {

                System.out.println("Consumed: " + message);

                // 在这里处理消息...

            }

        }

    }

 

    public static void main(String[] args) {

        Jedis jedis = new Jedis("localhost"); // 假设Redis服务器运行在本地

        RedisMessageConsumer consumer = new RedisMessageConsumer(jedis, "my-queue");

 

        // 在新的线程中运行消费者

        new Thread(consumer).start();

 

        // 注意:这里的main方法不会立即结束,因为消费者在一个无限循环中运行。

        // 在实际应用中,你可能希望以不同的方式管理消费者的生命周期。

    }

}

 

 

注意:

 

在这个例子中,RedisMessageConsumer的main方法启动了一个新线程来运行消费者。在实际应用中,你可能希望以更复杂的方式管理这些线程,例如使用线程池或Spring的@Async注解。

jedis.brpop(0, queueName)中的0表示阻塞的时间(以秒为单位)。传递0意味着它将无限期地阻塞,直到有消息可用。

请确保你的Redis服务器正在运行,并且Java应用程序可以访问它。如果Redis服务器不在本地运行,你需要将Jedis的构造函数中的"localhost"替换为Redis服务器的实际地址。

在实际应用中,你可能还需要处理异常、优雅地关闭连接以及确保在应用程序终止时正确地清理资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/24939.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Cesium项目报错An error occurred while rendering. Rendering has stopped.

一般就是本地打开会报错&#xff0c;改成用本地服务器打开 全局安装一个live-server sudo cnpm i live-server -g然后新增一个package.json文件 npm init -y然后在package.json的scripts中增加一个命令 "server": "live-server ./ --port8181 --hostlocalhos…

AI图书推荐:用ChatGPT来写非虚构类书籍

这本书《用ChatGPT来写非虚构类书籍 》&#xff08;ChatGPT For KDP_ A manual from an experienced self-publisher to nonfiction authors for writing the book you were born to write with ChatGPT prompts mastering&#xff09;是一本专为非虚构类书籍作者编写的指南&am…

实习记录2

1.flowable框架参数传递大概流程 通过传递xml&#xff0c;传递到后端&#xff0c;然后后端去解析 2.vue封装组件 在 Vue.js 中创建可复用的自定义组件是一个常见的需求&#xff0c;这样可以提高代码的复用性和可维护性。下面是一个简单的步骤指南&#xff0c;帮助你创建一个…

MLC工具是否适用AMD和ARM场景?如何测试内存性能?

MLC&#xff08;Memory Latency Checker&#xff09;主要是由Intel开发的工具&#xff0c;主要用于Intel平台上的内存性能测试&#xff0c;尤其是针对Intel处理器的内存延迟和带宽。尽管MLC主要针对Intel处理器设计&#xff0c;理论上它可以在任何支持Intel兼容指令集的系统上运…

嵌入式linux系统中利用I2C控制器应用开发详解

大家好,今天主要给大家分享一下,在linux系统上如何使用I2C进行应用开发详解。 l2C (Inter一Integrated Circuit BUS)是I2C BUS简称.中文为集成电路总线.是目前应用最广泛的总线之一。和IMX6ULL有些相关的是.刚好该总线是NXP前身的PHLIPS 设计。 第一:I2C协议概述 …

Spring框架IoC和AOP

Spring是一个开源的Java应用程序框架&#xff0c;旨在简化企业级应用程序的开发。它注重解决Java开发中的常见问题&#xff0c;如依赖注入、面向切面编程、事务管理等。Spring提供了一个轻量级的容器&#xff0c;用于管理和装配应用程序的对象。 Spring框架具有以下特点和优势…

xml 取值错误 #{} boolean 一直为 false

取值时 #{param.msgStatus} 一直是false&#xff0c;java代码里面显示true。 <select id"findPageOaReading" resultType"com.focusin.data.office.func.dto.ProcessMessageInfoDTO">select i.*, t.template_name procdefNamefrom process_message_…

西瓜书总结——决策树原理+ID3决策树的模拟实现

西瓜书总结——决策树原理ID3决策树的模拟实现 前言1. 决策树结构2. 决策树的生成&#xff08;注意区分属性和类别&#xff09;3. 划分选择3.1 信息熵和信息增益3.2 增益率3.3 基尼指数&#xff08;鸡你指数&#xff09; 4. 剪枝处理4.1 预剪枝4.2 后剪枝 5. 连续值与缺失值处理…

二分+模拟,CF1461D - Divide and Summarize

一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 Problem - 1461D - Codeforces 二、解题报告 1、思路分析 我们发现每次分裂操作结果都是固定的 我们从初始序列分裂出两个确定的子序列&#xff0c;两个确定的子序列又分裂出4个确定的子序列 那么也就是说…

【Python】解决Python报错:ZeroDivisionError: division by zero

​​​​ 文章目录 引言1. 错误详解2. 常见的出错场景2.1 直接除零2.2 变量导致的间接除零 3. 解决方案3.1 检查除数3.2 使用异常处理 4. 预防措施4.1 数据验证4.2 编写防御性代码 结语 引言 在Python中&#xff0c;尝试将一个数字除以零时&#xff0c;会抛出ZeroDivisionErr…

安装 hbase(伪分布式)

目录 1、安装 jdk8 &#xff08;1&#xff09;选择 jdk 版本 &#xff08;2&#xff09;下载 jdk 并解压 &#xff08;3&#xff09;配置环境变量 2、安装hadoop &#xff08;1&#xff09;添加 hadoop 用户&#xff0c;配置免密登录 &#xff08;2&#xff09;下载 hado…

Duilib多标签选项卡拖拽效果:添加动画特效!

动画是小型界面库的“难题”、“通病” 几年前就有人分享了如何用direct UI制作多标签选项卡界面的方法。还有人出了一个简易的浏览器demo。但是他们的标签栏都没有Chrome浏览器那样的动画特效。 如何给界面添加布局是的动画特效呢&#xff1f; 动画使界面看起来高大上&#…

【录制,纯正人声】OBS录制软件,音频电流音,杂音解决办法,录制有噪声的解决办法

速度解决的方法 &#xff08;1&#xff09;用RNNoise去除噪声。RNNoise是一个开源的&#xff0c;效果不好的噪声去除器。使用方法就是点击滤镜&#xff0c;然后加噪声抑制RNNoise。【这方法不好用】 &#xff08;2&#xff09;用Krisp(https://krisp.ai/) 去除噪声。这个Kris…

探索C++ STL中的std::list:链式存储的艺术与实践

目录 ​编辑 引言 一、std::list详解 二、std::list的关键成员函数 三、示例代码 四、std::list与std::vector的对比 内存布局&#xff1a; 插入与删除&#xff1a; 迭代器稳定性&#xff1a; 五、应用场景 结语 引言 在C标准模板库(STL)中&#xff0c;std::list作…

skywalking学习

文章目录 前言一、skywalking单体安装部署1. 下载skywalking2. 部署oap和oap-ui服务3. 测试skywalking监控springboot应用 二、搭建swck(skywalking集群)1.安装k8s2.下载swck3.设置pod自动注入java agent 三、skywalking监控python四、skywalking监控cpp总结参考 前言 本文主要…

两段代码想编译其中一段

在Linux环境下&#xff0c;如果您的项目中包含两段代码且您只想编译其中一段&#xff0c;您可以采用多种方法来实现这个需求。具体分析如下&#xff1a; 1. **使用条件编译**&#xff1a;通过预处理指令来控制代码的编译。例如&#xff0c;您可以使用#ifdef、#ifndef、#if、#el…

SSL/TLS和HTTPS

HTTPS就是用了TLS包装的Socket进行通信的HTTP 混合加密 被称为混合加密。具体过程如下&#xff1a; 使用非对称加密协商对称密钥&#xff1a; 在通信的开始阶段&#xff0c;通常由客户端和服务器使用非对称加密算法&#xff08;如RSA&#xff09;来协商一个对称密钥。通常情…

vue3中的ref与reactive的区别

目录 1、两者的区别底层实现响应式引用与响应式对象 2、用法3、vue3中声明的数组/对象3.1 通过reactive 声明的Array/Object&#xff0c;给它重新分配一个全新的对象时&#xff0c;会出错、或失去响应式效果 3.2 解决方案 4、cosnt 说明5、Proxy 与 definePropertyref 浅层响应…

npm 异常:peer eslint@“>=1.6.0 <7.0.0“ from eslint-loader@2.2.1

node用16版本 npm install npm6.14.15 -g将版本降级到6

Redisson知识

使用Redission获取锁 RLock lock redisson.getLock("my-lock"); 一、Redisson使用不指定锁过期时间的方式加锁&#xff1a; lock.lock(); 特点&#xff1a; 1.使用Redisson加的锁&#xff0c;具有自动续期机制&#xff0c;如果业务运行时间较长&#xff0c;运行…