Redis 使用 List 实现消息队列的利与弊

3ebd0aca6f3b2d6b877ac14030760fbb.gif

作者 | 码哥字节

来源 | 码哥字节

分布式系统中必备的一个中间件就是消息队列,通过消息队列我们能对服务间进行异步解耦、流量消峰、实现最终一致性。

目前市面上已经有 RabbitMQ、RochetMQ、ActiveMQ、Kafka等,有人会问:“Redis 适合做消息队列么?”

在回答这个问题之前,我们先从本质思考:

  • 消息队列提供了什么特性?

  • Redis 如何实现消息队列?是否满足存取需求?

今天,码哥结合消息队列的特点一步步带大家分析使用 Redis 的 List 作为消息队列的实现原理,并分享如何把 SpringBoot 与 Redission 整合运用到项目中。

什么是消息队列

消息队列是一种异步的服务间通信方式,适用于分布式和微服务架构。消息在被处理和删除之前一直存储在队列上。

每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批处理工作以及缓解高峰期工作负载。

5e8f3df70017fc4e373c5b1179879a49.png

消息队列
  • Producer:消息生产者,负责产生和发送消息到 Broker;

  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;

  • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;

消息队列的使用场景有哪些呢?

消息队列在实际应用中包括如下四个场景:

  • 应用耦合:发送方、接收方系统之间不需要了解双方,只需要认识消息。多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;

  • 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;

  • 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;

  • 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理;

消息队列满足哪些特性

消息有序性

消息是异步处理的,但是消费者需要按照生产者发送消息的顺序来消费,避免出现后发送的消息被先处理的情况。

重复消息处理

生产者可能因为网络问题出现消息重传导致消费者可能会收到多条重复消息。

同样的消息重复多次的话可能会造成一业务逻辑多次执行,需要确保如何避免重复消费问题。

可靠性

一次保证消息的传递。如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它。

当消费者重启后,可以继续读取消息进行处理,防止消息遗漏。

List 实现消息队列

Redis 的列表(List)是一种线性的有序结构,可以按照元素被推入列表中的顺序来存储元素,能满足「先进先出」的需求,这些元素既可以是文字数据,又可以是二进制数据。

LPUSH

生产者使用 LPUSH key element[element...] 将消息插入到队列的头部,如果 key 不存在则会创建一个空的队列再插入消息。

如下,生产者向队列 queue 先后插入了 「Java」「码哥字节」「Go」,返回值表示消息插入队列后的个数。

> LPUSH queue Java 码哥字节 Go
(integer) 3

RPOP

消费者使用 RPOP key 依次读取队列的消息,先进先出,所以 「Java」会先读取消费:

> RPOP queue
"Java"
> RPOP queue
"码哥字节"
> RPOP queue
"Go"

1bc5f54656ecd159e9e5ffc4f268029b.png

List队列

实时消费问题

65 哥:这么简单就实现了么?

别高兴的太早,LPUSH、RPOP 存在一个性能风险,生产者向队列插入数据的时候,List 并不会主动通知消费者及时消费。

我们需要写一个 while(true) 不停地调用 RPOP 指令,当有新消息就会返回消息,否则返回空。

程序需要不断轮询并判断是否为空再执行消费逻辑,这就会导致即使没有新消息写入到队列,消费者也要不停地调用 RPOP 命令占用 CPU 资源。

65 哥:要如何避免循环调用导致的 CPU 性能损耗呢?

Redis 提供了 BLPOP、BRPOP 阻塞读取的命令,消费者在在读取队列没有数据的时候自动阻塞,直到有新的消息写入队列,才会继续读取新消息执行业务逻辑。

BRPOP queue 0

参数 0 表示阻塞等待时间无无限制

重复消费

  • 消息队列为每一条消息生成一个「全局 ID」;

  • 生产者为每一条消息创建一条「全局 ID」,消费者把一件处理过的消息 ID 记录下来判断是否重复。

其实这就是幂等,对于同一条消息,消费者收到后处理一次的结果和多次的结果是一致的。

消息可靠性

65 哥:消费者从 List 中读取一条在消息处理过程中宕机了就会导致消息没有处理完成,可是数据已经没有保存在 List 中了咋办?

本质就是消费者在处理消息的时候崩溃了,就无法再还原消息,缺乏一个消息确认机制。

Redis 提供了 RPOPLPUSH、BRPOPLPUSH(阻塞)两个指令,含义是从 List 从读取消息的同时把这条消息复制到另一个 List 中(备份),并且是原子操作。

我们就可以在业务流程正确处理完成后再删除队列消息实现消息确认机制。如果在处理消息的时候宕机了,重启后再从备份 List 中读取消息处理。

LPUSH redisMQ 公众号 码哥字节
BRPOPLPUSH redisMQ redisMQBack

生产者用 LPUSH 把消息插入到 redisMQ 队列中,消费者使用 BRPOPLPUSH 读取消息「公众号」,同时该消息会被插入到 「redisMQBack」队列中。

如果消费成功则把「redisMQBack」的消息删除即可,异常的话可以继续从 「redisMQBack」再次读取消息处理。

248fb7021cb56e1b4928ed42fe5e24d2.png

redis消息确认机制

需要注意的是,如果生产者消息发送的很快,而消费者处理速度慢就会导致消息堆积,给 Redis 的内存带来过大压力。

Redission 实战

在 Java 中,我们可以利用 Redission 封装的 API 来快速实现队列,接下来码哥基于 SpringBoot 2.1.4 版本来交大家如何整合并实战。

详细 API 文档大家可查阅:

https://github.com/redisson/redisson/wiki/7.-Distributed-collections

添加依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.16.7</version>
</dependency>

添加 Redis 配置,码哥的 Redis 没有配置密码,大家根据实际情况配置即可。

spring:application:name: redissionredis:host: 127.0.0.1port: 6379ssl: false

Java 代码实战

RBlockingDeque 继承 java.util.concurrent.BlockingDeque ,在使用过程中我们完全可以根据接口文档来选择合适的 API 去实现业务逻辑。

主要方法如下:

2ea9fe6f40097bed3707cd39e3215cbe.png

码哥采用了双端队列来举例

@Slf4j
@Service
public class QueueService {@Autowiredprivate RedissonClient redissonClient;private static final String REDIS_MQ = "redisMQ";/*** 发送消息到队列头部** @param message*/public void sendMessage(String message) {RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ);try {blockingDeque.putFirst(message);log.info("将消息: {} 插入到队列。", message);} catch (InterruptedException e) {e.printStackTrace();}}/*** 从队列尾部阻塞读取消息,若没有消息,线程就会阻塞等待新消息插入,防止 CPU 空转*/public void onMessage() {RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ);while (true) {try {String message = blockingDeque.takeLast();log.info("从队列 {} 中读取到消息:{}.", REDIS_MQ, message);} catch (InterruptedException e) {e.printStackTrace();}}}

单元测试

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RedissionApplication.class)
public class RedissionApplicationTests {@Autowiredprivate QueueService queueService;@Testpublic void testQueue() throws InterruptedException {new Thread(() -> {for (int i = 0; i < 1000; i++) {queueService.sendMessage("消息" + i);}}).start();new Thread(() -> queueService.onMessage()).start();Thread.currentThread().join();}}

总结

可以使用 List 数据结构来实现消息队列,满足先进先出。为了实现消息可靠性,Redis 提供了 BRPOPLPUSH 命令是解决。

Redis 是一个非常轻量级的键值数据库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。

而 Kafka、RabbitMQ 部署时,涉及额外的组件,例如 Kafka 的运行就需要再部署 ZooKeeper。相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。

需要注意的是,我们要避免生产者过快,消费者过慢导致的消息堆积占用 Redis 的内存。

在消息量不大的情况下使用 Redis 作为消息队列,他能给我们带来高性能的消息读写,这似乎也是一个很好消息队列解决方案。

大家觉得是否合适作为消息队列呢?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/512501.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里云表格存储全面升级,打造一站式物联网存储新方案

简介&#xff1a; 阿里云表格存储全面升级&#xff0c;打造一站式物联网存储新方案 2021年9月1日&#xff0c;阿里云表格存储Tablestore重磅发布新能力&#xff1a;一站式物联网存储IoTstore。该新能力是阿里云表格存储Tablestore面向物联网深度垂直场景进行的一次技术升级&am…

手把手一起 图形化安装 k8s 集群

作者 | 小碗汤来源 | 我的小碗汤今天接着上一节&#xff0c;使用 KuboardSpray 图形化安装kubernetes集群[1]&#xff0c;记录了安装时可能遇到的问题。对此项目感兴趣的同学&#xff0c;不妨亲手实践一下~以下记录了安装单节点&#xff08;单master的集群&#xff09;&#xf…

Jaeger插件开发及背后的思考

简介&#xff1a; 本文主要介绍Jaeger最新的插件化后端的接口以及开发方法&#xff0c;让大家能够一步步的根据文章完成一个Jaeger插件的开发。此外SLS也推出了对于Jaeger的支持&#xff0c;欢迎大家试用。 随着云原生 微服务的推广和落地&#xff0c;服务监控也变得越来越重…

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-架构篇

简介&#xff1a; 本文简要介绍了基于 MySQL 结合 Tablestore 的大规模订单系统方案。这种方案支持大数据存储、高性能数据检索、SQL搜索、实时与全量数据分析&#xff0c;且部署简单、运维成本低。 作者 | 弘楠 来源 | 阿里技术公众号 一 背景 订单系统存在于各行各业&#…

ajax返回来总是html,ajax返回类型

基于arcgis的webgis开发中目前是否还直接用ajax技本人是arcgis刚接触者&#xff0c;以前有听说过ajax这个技术&#xff0c;用于浏览器和web服务ajax技术现在依然是客户端浏览器和服务器交互的重要手段。 如果你用arcgis api for js技术&#xff0c;同样会使用ajax技术。这是良好…

三分钟教你用 Scarlet 写一个 WebSocket App

作者 | Eason来源 | 程序员巴士在移动应用程序中&#xff0c;数据层是屏幕上显示内容的真实来源。然而&#xff0c;在今年早些时候在 Tinder 中集成了 WebSocket API 时&#xff0c;维护它成为了一个令人头疼的问题。为了在 Android 上更轻松地集成 WebSocket&#xff0c;Scarl…

重磅发布|新一代云原生数据仓库AnalyticDB「SQL智能诊断」功能详解

简介&#xff1a; AnalyticDB For MySQL为用户提供了高效、实时、功能丰富并且智能化的「SQL智能诊断」和「SQL智能调优」功能&#xff0c;提供用户SQL性能调优的思路、方向和具体的方法&#xff0c;降低用户使用成本&#xff0c;提高用户使用ADB的效率 SQL是一种简单易用的业…

技术干货|基于Apache Hudi 的CDC数据入湖「内附干货PPT下载渠道」

简介&#xff1a; 阿里云技术专家李少锋(风泽)在Apache Hudi 与 Apache Pulsar 联合 Meetup 杭州站上的演讲整理稿件&#xff0c;本议题将介绍典型 CDC 入湖场景&#xff0c;以及如何使用 Pulsar/Hudi 来构建数据湖&#xff0c;同时将会分享 Hudi 内核设计、新愿景以及社区最新…

探究 Java 应用的启动速度优化

简介&#xff1a; 在高性能的背后&#xff0c;Java 的启动性能差也令人印象深刻&#xff0c;大家印象中的 Java 笨重缓慢的印象也大多来源于此。高性能和快启动速度似乎有一些相悖&#xff0c;本文将和大家一起探究两者是否可以兼得。 作者 | 梁希 高性能和快启动速度&#x…

阿里云刘伟光:金融核心系统将步入分布式智能化的时代

1月18日&#xff0c;阿里云在京发布金融核心系统转型“红宝书”&#xff0c;并推出“金融级云原生工场”&#xff0c;通过新的建设理念和相应的全链路平台技术&#xff0c;以及先进的部署体系&#xff0c;支撑金融机构建设面向未来的新一代分布式智能化核心系统。 阿里云智能新…

5分钟搞定Loki告警多渠道接入

简介&#xff1a; Loki是受Prometheus启发的水平可扩展、高可用、多租户日志聚合系统。用户既可以将Loki告警直接接入SLS开放告警&#xff0c;也可以先将Loki接入Grafana或Alert Manager&#xff0c;再借助Grafana或Alert Manager实现Loki间接接入SLS开放告警。 直接接入 您可…

当微服务遇上 Serverless | 微服务容器化最短路径,微服务 on Serverless 最佳实践

简介&#xff1a; 阿里云Serverless应用引擎&#xff08;SAE&#xff09;初衷是让客户不改任何代码&#xff0c;不改变应用部署方式&#xff0c;就可以享受到微服务K8sServerless的完整体验&#xff0c;开箱即用免运维。 前言 微服务作为一种更灵活、可靠、开放的架构&#x…

学计算机就业靠谱吗,2018年计算机专业就业怎么样?

由孙中山先生创办的至今已有一百多年办学传统&#xff0c;已经成为一所国内一流、国际知名的现代综合性大学。涉足的领域较广&#xff0c;有法律、医学等领域&#xff0c;每个领域都取得不俗的成绩。该校的计算机专业自开设以来也颇受学生欢迎&#xff0c;2018年计算机专业就业…

Serverless 工程实践 | 细数 Serverless 的配套服务

简介&#xff1a; 上文说到云计算的十余年发展让整个互联网行业发生了翻天覆地的变化&#xff0c;Serverless 作为云计算的产物&#xff0c;或者说是云计算在某个时代的表现&#xff0c;被很多人认为是真正意义上的云计算&#xff0c;关于“Serverless 是什么”这个问题&#x…

程序员在想些什么?拒绝盲猜,CSDN帮你精准洞察 Ta 们的心

CSDN 推出《开发者研究与洞察》服务。基于3200万开发者的资源&#xff0c;从开发者视角出发&#xff0c;聚焦开发者“关注”、“使用”、“体验”三方面&#xff0c;帮助技术推广者打造技术品牌、优化技术产品的市场投放策略、提升技术产品的开发者使用体验&#xff0c;直接聆听…

伴鱼:借助 Flink 完成机器学习特征系统的升级

简介&#xff1a; Flink 用于机器学习特征工程&#xff0c;解决了特征上线难的问题&#xff1b;以及 SQL Python UDF 如何用于生产实践。 本文作者陈易生&#xff0c;介绍了伴鱼平台机器学习特征系统的升级&#xff0c;在架构上&#xff0c;从 Spark 转为 Flink&#xff0c;解…

小型微型计算机系统退回修改,小型微型计算机系统

基本信息期刊名称小型微型计算机系统《中国计算机系统杂志》的英文名称出版周期每月发布了ISSN 1000-1220发布CN 21-1106 / TP邮政编码8-108组织者中国科学院沉阳计算技术研究所出版地: 辽宁省沉阳市期刊首页网址提交URL包含在中/荣誉CSCD核心期刊中国科学引文Pж(AJ)摘要杂志C…

Flink 1.14 新特性预览

简介&#xff1a; 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理&#xff0c;内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日线上 Flink Meetup 分享的《Flink 1.14 新特性预览》。主要内容为&#xff1a; 简介流批一体Checkpoint 机制性能与效率…

2021 年云原生技术发展现状及未来趋势

简介&#xff1a; 作者于雨担任了 2021 年 GIAC 会议云原生专场的出品人兼讲师&#xff0c;组织了前后四个场子的演讲&#xff0c;在这个过程中作者同时作为听众从这些同行的演讲中学到了很多非常有用的知识。本文算是对 2021 GIAC 云原生专场的侧记&#xff0c;管中窥豹&#…

像搭“乐高”一样实现整合式网络安全体系

部署多种防护产品&#xff0c;却无法形成防御合力&#xff0c;是当前很多企业网络安全建设都面临的挑战。网络安全能力整合是企业的刚需&#xff0c;也是行业发展的大势所趋。虽然Gartner 提出的网络安全网格架构&#xff08;CSMA&#xff0c;Cybersecurity Mesh Architecture …