C# redis通过stream实现消息队列以及ack机制

redis实现

查看redis版本

redis需要>5.0
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。

它实现了大部分消息队列的功能:

  • 消息 ID 系列化生成;
  • 消息遍历;
  • 消息的阻塞和非阻塞读;
  • Consumer Groups 消费组;
  • ACK 确认机制。
  • 支持多播。

本次主要实现基本的消息发送接受确认,消费组有需要的可以看参考的文章

info

在这里插入图片描述

插入消息

XADD streamName id field value [field value ...]
# 消息队列名称后面的 「*」 ,表示让 Redis 为插入的消息自动生成唯一 ID,当然也可以自己定义。
# 消息 ID 由两部分组成:当前毫秒内的时间戳; 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令
XADD queue01 * name wjl age 25 gender male

在这里插入图片描述

读取消息

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XREAD COUNT 1 BLOCK 0 STREAMS queue01  0-0
# 指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
# 如果想使用 XREAD 进行顺序消费,每次读取后要记住返回的消息 ID,下次调用 XREAD 就将上一次返回的消息 ID 作为参数传递到下一次调用就可以继续消费后续的消息了。

在这里插入图片描述
这里只是开胃菜,通过 XREAD 读取的数据其实并没有被删除,当重新执行 XREAD COUNT 1 BLOCK 0 STREAMS queue01 0-0指令的时候又会重新读取到。

创建消费组

# Stream 通过 XGROUP CREATE 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。
# 随便再插入一些数据
XADD queue01 * name zhangsan age 52 gender male
XADD queue01 * name lisi age 34 gender male
XADD queue01 * name xiaomei age 24 gender famale
# 创建消费组的指令
# 格式
XGROUP CREATE stream group start_id
# stream:指定队列的名字;
# group:指定消费组名字;
# start_id:指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
# MKSTREAM:默认情况下,XGROUP CREATE命令在目标流不存在时返回错误。可以使用可选MKSTREAM子命令作为 之后的最后一个参数来自动创建流。# 新建group01消费组
XGROUP CREATE queue01 group01 0-0 MKSTREAM

读取群组消息

XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >
# >:命令的最后参数 >,表示从尚未被消费的消息开始读取;
# BLOCK 0:表示阻塞读取,要是大于0就是等待多少毫秒

在这里插入图片描述

如果消息队列中的消息被消费组的一个消费者消费了,这条消息就不会再被这个消费组的其他消费者读取到。

在这里插入图片描述

查看已读未确认消息

XREADGROUP GROUP groupName consumerName
XPENDING queue01 group01 

在这里插入图片描述

1 # 未读消息条数
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最小
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最大
consumer01
1

查看消费者读取了哪些数据

XPENDING queue01 group01 - + 10 consumer01

在这里插入图片描述

确认消息

XACK key group-key ID [ID ...]XACK queue01 group01 1696822787364-0

在这里插入图片描述
再次查询未读消息

XPENDING queue01 group01 
XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >

在这里插入图片描述
在这里插入图片描述

C#操作redis实现

使用FreeRedis类库,熟悉了上面的流程,直接上代码

using FreeRedis;namespace RedisMQStu01
{internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var groupName = "group01";//读取队列的群组的名字var consumerName = "consumer01";//消费者的名字//添加数据await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");//创建群组,如果数据存在则不需要执行了,第一次需要执行await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);//读取群组消息var ids = new Dictionary<string, string>();ids.Add("queue01", ">");var result = await cli.XReadGroupAsync(groupName, consumerName,1, 0, noack: false, ids);//查看已读未确认的消息var unReadResults = await cli.XPendingAsync(queueName, groupName);await Console.Out.WriteLineAsync($"未读消息条数为:{unReadResults.count}");foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列idawait Console.Out.WriteAsync($"\t");foreach (var field in entry.fieldValues){await Console.Out.WriteAsync($"\t{field.ToString()}");}await Console.Out.WriteLineAsync();//确认消息await cli.XAckAsync(queueName,groupName, entry.id);}}await Console.Out.WriteLineAsync("完成");}}
}

上面的代码是生产者和消费者在一块,不满足生产环境要求,因为生产环境大多需要分开,生产者只负责生产,消费者只负责消费

生产者

using FreeRedis;namespace RedisMQProductor01
{internal class Program{/// <summary>/// redis消息队列的生产者/// </summary>/// <param name="args"></param>/// <returns></returns>async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字//添加数据await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");await Console.Out.WriteLineAsync("生产者添加数据完成");}}
}

消费者

using FreeRedis;namespace RedisMQConsumer01
{/// <summary>/// redis消息队列的消费者/// </summary>internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var groupName = "group01";//读取队列的群组的名字var consumerName = "consumer01";//消费者的名字//如果数据存在则不需要执行了,第一次需要执行var info = await cli.XInfoGroupsAsync(queueName);if (info == null || info.Length < 1){//创建群组await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);}//读取群组消息var ids = new Dictionary<string, string>();ids.Add("queue01", ">");//block的值是0表示无限等待var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);while (true){if (result != null && result.Length > 0){foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列idawait Console.Out.WriteAsync($"\t");foreach (var field in entry.fieldValues){await Console.Out.WriteAsync($"\t{field.ToString()}");}await Console.Out.WriteLineAsync();//确认消息await cli.XAckAsync(queueName, groupName, entry.id);}}await Console.Out.WriteLineAsync("===============本次处理完毕===============");}//继续等待result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);}}}
}

先启动生产者在启动消费者查看效果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

方法改善

改善之后可以先启动消费者然后等待生产者投递数据即可

消费者

using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;namespace CelueStu02
{/// <summary>/// 备份策略消费者/// </summary>internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var groupName = "group01";//读取队列的群组的名字var consumerName = "consumer01";//消费者的名字try{var streamInfo = cli.XInfoStream(queueName);}catch{await cli.XAddAsync(queueName, "student", "");}//如果数据存在则不需要执行了,第一次需要执行var info = await cli.XInfoGroupsAsync(queueName);if (info == null || info.Length < 1){//创建群组await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);}//读取群组消息var ids = new Dictionary<string, string>();ids.Add("queue01", ">");//block的值是0表示无限等待var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);ConnectionConfig connectionConfig = new ConnectionConfig(){ConnectionString = "",//自己写数据库链接字符串IsAutoCloseConnection = true,DbType = DbType.SqlServer};using SqlSugarClient db = new SqlSugarClient(connectionConfig);//初始化表格db.CodeFirst.InitTables(typeof(Student));while (true){if (result != null && result.Length > 0){foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列idfor (int i = 0; i < entry.fieldValues.Length; i++){var field = entry.fieldValues[i];if (field.ToString() == "student"){var studentListJson = entry.fieldValues[i + 1]?.ToString() ?? "";if (string.IsNullOrWhiteSpace(studentListJson)){continue;}var students = JsonConvert.DeserializeObject<List<Student>>(studentListJson);await db.Storageable(students).ExecuteCommandAsync();}}//确认消息await cli.XAckAsync(queueName, groupName, entry.id);}}await Console.Out.WriteLineAsync("===============本次处理完毕===============");}//继续等待result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);}}}
}

生产者

using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;namespace CelueStu01
{/// <summary>/// 备份策略生产者/// </summary>internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var perProcessNumber = 1000;//每次处理的数据条数int totalPage = 0;//总页码数ConnectionConfig connectionConfig = new ConnectionConfig(){ConnectionString = "",IsAutoCloseConnection = true,DbType = DbType.SqlServer};using (SqlSugarClient db = new SqlSugarClient(connectionConfig)){//初始化表格db.CodeFirst.InitTables(typeof(Student));do{int count = await db.Queryable<Student>().CountAsync();totalPage = count % perProcessNumber == 0 ? count / perProcessNumber : (count / perProcessNumber) + 1;var students = await db.Queryable<Student>().ToPageListAsync(totalPage, perProcessNumber);//批量发送,redis频繁写入会报rdb错误,限制一下写入频率await cli.XAddAsync(queueName, "student", JsonConvert.SerializeObject(students));List<int> deleteStudents = students.Select(p => p.Id).ToList();if (deleteStudents.Any()){//批量删除await db.Deleteable<Student>().Where(p => deleteStudents.Contains(p.Id)).ExecuteCommandAsync();}totalPage -= 1;//Thread.Sleep(2000);} while (totalPage > 0);}await Console.Out.WriteLineAsync("生产者添加数据完成");}}
}

参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/102630.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity MRTK Hololens2眼动交互

/** ** UnityVersion : 2021.3.6f1* Description : 眼部交互基类* Author: * CreateTime : 2023-10-11 09:43:20* Version : V1.0.0* * */using System.Collections.Generic; using Microsoft.MixedReality.Toolkit.Input; using UnityEngine;namespace MRTKExtend.EyeTrackin…

神秘的锦衣卫

在看明朝电视剧经常听到的一句台词&#xff1a;锦衣卫办案&#xff0c;闲杂人等速速离开。锦衣卫是明朝特务机构&#xff0c;直接听命于皇帝&#xff0c;是亲军卫之一&#xff0c;也是最重要的一卫。 1、卫所制 卫所制是明代最主要的军事制度&#xff0c;其目标是寓兵于农、屯…

22字符串-简单反转

目录 BM&#xff08;Boyer-Moore&#xff09; 坏字符 好后缀 什么情况用哪个规则&#xff1f; LeetCode之路——151. 反转字符串中的单词 分析: 字符串匹配中除了简单的BF&#xff08;Brute Force&#xff09;、RK&#xff08;Rabin-Karp&#xff09;算法&#xff0c;还有…

PHP Discord获取频道消息功能实现

PHP Discord获取频道消息功能实现 1. 关注对应频道2. 添加机器人3. 配置机器人权限4. 使用 DiscordPHP 类库5. 代码示例 (Laravel 框架)6. 服务器部署 1. 关注对应频道 首先要创建自己的频道, 然后到对应的公告频道中关注这个频道(这时 Discord 会让你选择频道, 选择之前创建的…

区块链游戏的开发框架

链游&#xff08;Blockchain Games&#xff09;是基于区块链技术构建的游戏。它们与传统游戏有一些显著不同之处&#xff0c;因此需要特定的开发框架和工具。以下是一些用于链游开发的开发框架及其特点&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专…

基于STM32_DS18B20单总线传感器驱动

基于STM32_DS18B20单总线传感器驱动 文章目录 基于STM32_DS18B20单总线传感器驱动前言一、BS18B20&#xff1f;二、原理1.复位与检验2.基本命令3.唯一ROM识别码4.温度转换 三、驱动代码四、注意事项 前言 本文以一款典型的单总线传感器及其驱动——DS18B20为例&#xff0c;简单…

《UnityShader入门精要》学习2

UnityShader 基础 UnityShader 概述 一对好兄弟&#xff1a;材质和UnityShader 总体来说&#xff0c;在Unity中我们需要配合使用材质&#xff08;Material&#xff09;和Unity Shader才能达到需要的效果。一个最常见的流程是&#xff1a; &#xff08;1&#xff09;创建一个…

(5)SpringMVC处理携带JSON格式(“key“:value)请求数据的Ajax请求

SpringMVC处理Ajax 参考文章数据交换的常见格式,如JSON格式和XML格式 请求参数的携带方式 浏览器发送到服务器的请求参数有namevalue&...(键值对)和{key:value,...}(json对象)两种格式 URL请求和表单的GET请求会将请求参数以键值对的格式拼接到请求地址后面form表单的P…

【深度学习】UniControl 一个统一的扩散模型用于可控的野外视觉生成

论文&#xff1a;https://arxiv.org/abs/2305.11147 代码&#xff1a;https://github.com/salesforce/UniControl#data-preparation docker快速部署&#xff1a;https://qq742971636.blog.csdn.net/article/details/133129146 文章目录 AbstractIntroductionRelated WorksUniCo…

【Linux】HTTPS协议

文章目录 &#x1f4d6; 前言1. 引入https协议2. 常见的加密方式2.1 对称加密&#xff1a;2.2 非对称加密&#xff1a;2.3 数据摘要&&数据指纹&#xff1a; 3. 对加密方式的探究3.1 只使用对称加密&#xff1a;3.2 只使用非对称加密&#xff1a;3.3 双方都使用非对称加…

SQL和Python,哪个更容易自学?哪个更适合数据工作的编程新手?

如果你想从事数据工作&#xff0c;比如数据分析、数据开发、数据科学等&#xff0c;你可能会遇到这样的问题&#xff1a;SQL和Python哪个更容易自学&#xff1f;哪个更有用&#xff1f;哪个更有前途&#xff1f;其实这两种语言都是数据工作的重要技能&#xff0c;但它们的特点和…

计及电转气协同的含碳捕集与垃圾焚烧虚拟电厂优化调度(matlab代码)

目录 1 主要内容 系统结构 CCPP-P2G-燃气机组子系统 非线性处理缺陷 2 部分代码 3 程序结果 4 程序链接 1 主要内容 该程序参考《计及电转气协同的含碳捕集与垃圾焚烧虚拟电厂优化调度》模型&#xff0c;主要实现的是计及电转气协同的含碳捕集与垃圾焚烧虚拟电厂优化调度…

智能井盖传感器:提升城市安全与便利的利器

在智能化城市建设的浪潮中&#xff0c;WITBEE万宾智能井盖传感器&#xff0c;正以其卓越的性能和创新的科技&#xff0c;吸引着越来越多的关注。本文小编将为大家详细介绍这款产品的独特优势和广阔应用前景。 在我们生活的城市中&#xff0c;井盖可能是一个最不起眼的存在。然而…

通过动态IP解决网络数据采集问题

前言 网络数据采集是目前互联网上非常重要且广泛应用的技术之一&#xff0c;它可以帮助我们获取互联网上各种类型的数据&#xff0c;并将其转化为可用的信息。然而&#xff0c;一些网站为了保护其数据被滥用&#xff0c;采取了一系列的限制措施&#xff0c;其中包括对访问者的…

各类高危漏洞介绍及验证方式教程(一)

本期整理的漏洞验证教程约包含50多类漏洞&#xff0c;分多个章节编写&#xff0c;可从以下链接获取全文&#xff1a; 各类高危漏洞验证方式.docx (访问密码: 1455) 搭建dvwa测试环境基础教程.docx(访问密码: 1455) web逻辑漏洞挖掘快速入门基础教程.docx(访问密码: 1455) 01 Ca…

WPF向Avalonia迁移(三、项目结构)

前提&#xff1a; Avalonia版本11.0.0 1.配置文件 1.1 添加配置文件 1.2 读取配置文件 添加System.Configuration.ConfigurationManager using Avalonia.Controls; using System.Configuration;namespace AvaloniaApplication7.Views {public partial class MainWindow : W…

如何使用Net2FTP搭建免费web文件管理器打造个人网盘

文章目录 1.前言2. Net2FTP网站搭建2.1. Net2FTP下载和安装2.2. Net2FTP网页测试 3. cpolar内网穿透3.1.Cpolar云端设置3.2.Cpolar本地设置 4.公网访问测试5.结语 1.前言 文件传输可以说是互联网最主要的应用之一&#xff0c;特别是智能设备的大面积使用&#xff0c;无论是个人…

H3C交换机的40G堆叠线 ,可以插在普通光口做堆叠吗?

环境&#xff1a; S6520X-24ST-SI交换机 H3C LSWM1QSTK2万兆40G堆叠线QSFP 问题描述&#xff1a; H3C交换机的40G堆叠线 &#xff0c;可以插在普通光口做堆叠吗&#xff1f; 解答&#xff1a; 1.H3C交换机的40G堆叠线通常是用于连接堆叠模块或堆叠端口的。这些堆叠线通常使…

【技术干货】如何快速创建商用照明 OEM APP?

本文介绍了如何在涂鸦 IoT 平台的 App 工作台上创建一款体验版商照 App、正式版 OEM App、上架 App、以及完成通用配置。 OEM App 开发 创建 App 登录 涂鸦 IoT 平台的 App 页面。 单击 创建APP&#xff0c;选择 商照 APP 进行创建。 在提示框里&#xff0c;完善 App 信息…

通过Node.js获取高德的省市区数据并插入数据库

通过Node.js获取高德的省市区数据并插入数据库 1 创建秘钥1.1 登录高德地图开放平台1.2 创建应用1.3 绑定服务创建秘钥 2 获取数据并插入2.1 创建数据库连接工具2.2 请求数据2.3 数据处理2.4 全部代码 3 还可以打印文件到本地 1 创建秘钥 1.1 登录高德地图开放平台 打开开放平…