redis的持久化消息队列

Redis Stream

Redis Stream 是 Redis 5.0 版本新增加的数据结构。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

上图解析:

  • Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
  • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息

XADD

使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列,XADD 语法格式:

XADD key ID field value [field value ...]
  • key :队列名称,如果不存在就创建
  • ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
  • field value : 记录。

实例

redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1601372323627-1"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"
redis>

XTRIM

使用 XTRIM 对流进行修剪,限制长度, 语法格式:

XTRIM key MAXLEN [~] count
  • key :队列名称
  • MAXLEN :长度
  • count :数量

实例

127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"
127.0.0.1:6379>

redis>

XDEL

使用 XDEL 删除消息,语法格式:

XDEL key ID [ID ...]
  • key:队列名称
  • ID :消息 ID

实例

> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"

XLEN

使用 XLEN 获取流包含的元素数量,即消息长度,语法格式:

XLEN key
  • key:队列名称

实例

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>

XRANGE

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XRANGE key start end [COUNT count]
  • key :队列名
  • start :开始值, - 表示最小值
  • end :结束值, + 表示最大值
  • count :数量

实例

redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>

XREVRANGE

使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XREVRANGE key end start [COUNT count]
  • key :队列名
  • end :结束值, + 表示最大值
  • start :开始值, - 表示最小值
  • count :数量

实例

redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) "1601372731459-3"
   2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>

XREAD

使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :数量
  • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
  • key :队列名
  • id :消息 ID

实例

# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"
 

XGROUP CREATE

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建
  • groupname :组名。
  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

从头开始消费:

XGROUP CREATE mystream consumer-group-name 0-0  

从尾部开始消费:

XGROUP CREATE mystream consumer-group-name $

XREADGROUP GROUP

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名
  • consumer :消费者名。
  • count : 读取数量。
  • milliseconds : 阻塞毫秒数。
  • key : 队列名。
  • ID : 消息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/95009.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一键AI高清换脸——基于InsightFace、CodeFormer实现高清换脸与验证换脸后效果能否通过人脸比对、人脸识别算法

前言 1、项目简介 AI换脸是指利用基于深度学习和计算机视觉来替换或合成图像或视频中的人脸。可以将一个人的脸替换为另一个人的脸,或者将一个人的表情合成到另一个人的照片或视频中。算法常常被用在娱乐目上,例如在社交媒体上创建有趣的照片或视频,也有用于电影制作、特效…

全屋灯具选购指南,如何选择合适的灯具。福州中宅装饰,福州装修

灯具装修指南 灯具就像我们家里的星星,在黑暗中带给我们明亮,可是灯具如果选择的不好,这个效果不仅体现不出来,还会让人觉得烦躁。 灯具到底该怎么选呢?装修灯具有哪些注意事项呢?给大家做了一个总结&#…

C++设计模式-抽象工厂(Abstract Factory)

目录 C设计模式-抽象工厂(Abstract Factory) 一、意图 二、适用性 三、结构 四、参与者 五、代码 C设计模式-抽象工厂(Abstract Factory) 一、意图 提供一个创建一系列相关或相互依赖对象的接口,而无需指定它们…

笔试编程ACM模式JS(V8)、JS(Node)框架、输入输出初始化处理、常用方法、技巧

目录 考试注意事项 先审完题意,再动手 在本地编辑器(有提示) 简单题515min 通过率0%,有额外log 常见输入处理 str-> num arr:line.split( ).map(val>Number(val)) 初始化数组 new Array(length).fill(v…

国庆中秋特辑(七)Java软件工程师常见20道编程面试题

以下是中高级Java软件工程师常见编程面试题,共有20道。 如何判断一个数组是否为有序数组? 答案:可以通过一次遍历,比较相邻元素的大小。如果发现相邻元素的大小顺序不对,则数组不是有序数组。 public boolean isSort…

Windows下Tensorflow docker python开发环境搭建

前置条件 windows10 更新到较新的版本,硬件支持Hyper-V。 参考:https://learn.microsoft.com/zh-cn/windows/wsl/install 启用WSL 在Powershell中输入如下指令: dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsys…

01-工具篇-windows与linux文件共享

一般来说绝大部分PC上装的系统均是windows,为了开发linux程序,会在PC上安装一个Vmware的虚拟机,在虚拟机上安装ubuntu18.04,由于windows上的代码查看软件、浏览器,通信软件更全,我们想只用ubuntu进行编译&a…

哨兵(Sentinel-1、2)数据下载

哨兵(Sentinel-1、2)数据下载 一、登陆欧空局网站 二、检索 先下载2号为光学数据 分为S2A和S2B,产品种类有1C和2A,区别就是2A是做好大气校正的影像,当然数量也会少一些,云量检索条件中记得要按格式&#x…

【Linux基础】Linux发展史

👉系列专栏:【Linux基础】 🙈个人主页:sunny-ll 一、前言 本篇主要介绍Linux的发展历史,这里并不需要我们掌握,但是作为一个合格的Linux学习者与操作者,这些东西是需要了解的,而且…

深入理解浏览器渲染原理

文章目录 浏览器是如何渲染页面的渲染流程解析HTML(构建DOM树)解析过程中遇到JS代码 样式计算1. 解析CSS代码2. 转换样式表中的属性值,使其标准化3. 计算DOM树中每个节点的具体样式CSS继承规则CSS层叠规则 布局分层分层update layer tree 绘制…

WebGL 响应上下文丢失解决方案

目录 响应上下文丢失 如何响应上下文丢失 上下文事件 示例程序(RotatingTriangle_contextLost.js) 响应上下文丢失 WebGL使用了计算机的图形硬件,而这部分资源是被操作系统管理,由包括浏览器在内的多个应用程序共享。在某些特…

逐步解决Could not find artifact com:ojdbc8:jar:12

Could not find artifact com:ojdbc8:jar:12 in central (https://repo.maven.apache.org/maven2) 原因: ojdbc8:jar:12 属于Oracle 数据库链接的一个程序集,缺失的话很有可能会影响数据库链接,蝴蝶效应产生不可预测的BUG!但是版…

OpenGLES:绘制一个混色旋转的3D立方体

效果展示 混色旋转的3D立方体 一.概述 之前关于OpenGLES实战开发的博文,不论是实现相机滤镜还是绘制图形,都是在2D纬度 这篇博文开始,将会使用OpenGLES进入3D世界 本篇博文会实现一个颜色渐变、旋转的3D立方体 动态3D图形的绘制&#xf…

mybatise-plus的id过长问题

一、问题情景 笔者在做mp插入数据库(id已设置为自增)操作时,发现新增数据的id过长,结果导致前端JS拿到的数据出现了精度丢失问题,原因是后端id的类型是Long。在网上查了一下,只要在该属性上加上如下注解就可以 TableId(value &q…

进程调度的时机,切换与过程以及方式

1.进程调度的时机 进程调度(低级调度〉,就是按照某种算法从就绪队列中选择一个进程为其分配处理机。 1.需要进行进程调度与切换的情况 1.当前运行的进程主动放弃处理机 进程正常终止运行过程中发生异常而终止进程主动请求阻塞(如等待l/O)…

大模型部署手记(1)ChatGLM2+Windows GPU

1.简介: 组织机构:智谱/清华 代码仓:https://github.com/THUDM/ChatGLM2-6B 模型:THUDM/chatglm2-6b 下载:https://huggingface.co/THUDM/chatglm2-6b 镜像下载:https://aliendao.cn/models/THUDM/chat…

很普通的四非生,保研破局经验贴

推免之路 个人情况简介夏令营深圳大学情况机试面试结果 预推免湖南师范大学面试结果 安徽大学面试结果 北京科技大学笔试面试结果 合肥工业大学南京航空航天大学面试结果 暨南大学东北大学 最终结果一些建议写在后面 个人情况简介 教育水平:某中医药院校的医学信息…

Discuz!X 3.4任意文件删除漏洞

复现过程: 1.访问http://x.x.x/robots.txt(文件存在) 2.登录弱口令 账号:admin密码:admin 3.来到个人设置页面找到自己的formhash: 4.点击保存,抓包 来到这个参数:birthprovin…

力扣 -- 879. 盈利计划(二维费用的背包问题)

解题步骤&#xff1a; 参考代码&#xff1a; 未优化的代码&#xff1a; class Solution { public:int profitableSchemes(int n, int minProfit, vector<int>& group, vector<int>& profit) {//计划数int lengroup.size();//每一维都多开一行空间vector&…

websocket逆向【python实现websocket拦截】

python实现websocket拦截 前言一、拦截的优缺点优点:缺点:二、实现方法1.环境配置2.代码三、总结前言 开发者工具F12,筛选ws后,websocket的消息是这样显示的,如何获取这里面的消息呢? 以下是本篇文章正文内容 一、拦截的优缺点 主要讲解一下websocket拦截的实现,现在…