【BCVP】实现基于 Redis 的消息队列

聆听自己的声音

如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯。

话说上回书我们说到了,Redis的使用修改《【BCVP更新】StackExchange.Redis的异步开发方式》,通过异步的时候,基本上会解决StackExRedis组件使用过程中,可能在并发的时候遇到的问题,而且该组件也是微软官方推荐的(参考微软微服务框架eShopOnContainers),如果一定要抬杠说不好用,其实是没必要的。那今天我们继续往下说,简单说下如何基于Redis实现消息队列。

目前在市面上比较主流的消息队列中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ等这几种。当然常见的还是基于RabbitMQ来实现的,Redis份额稍微小了一点,但是因为Redis的仓储、缓存等多个方面的好处,使得Redis也是很火。

1

什么是消息队列

这个其实我今天不打算重点讲,因为我详细每个人能看这篇文章,肯定都知道消息队列的相关内容,但是为了不那么突兀,我就从网上粘贴几块基本概念,了解一二:

基本概念:

消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。

消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。

最终可以实现解耦的目的。

下面通过一个简单的架构模型来解释:

  • Producer:消息生产者,负责产生和发送消息到Broker。

  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个Queue。

  • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理。

有哪些优缺点:

从上边的定义中,我们可以看出来,优点主要是三块:

异步、流量削峰与流控、解耦

这三个优点在高并发等三高场景还是很有必要的,甚至说是十分必要的。

典型的广播模式,一个消息可以发布到多个消费者;

消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息;

比如我们某宝下订单,或某6抢车票,那都是放到队列里缓冲的,要是都用服务端等待,可能早就崩了,当然实际上比这个复杂的多。

而且,通过订阅发布的模式,异步执行,这样就会大大缓解时间压力。

但是,随之而来的弊端也是有的:
比如为了异步,就是接收者必须轮询消息队列,才能收到最近的消息。然后还有就是不能达到实时性,说白了就是用空间换时间,从而降低瓶颈。

消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回。不能保证每个消费者接收的时间是一致的。若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。

五种常见模式

简单模式Hello World
功能:一个生产者P发送消息到队列Q,一个消费者C接收

工作队列模式Work Queue
功能:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列

发布/订阅模式Publish/Subscribe
功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者

路由模式Routing
说明:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key

通配符(主题)模式Topic
说明:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;

更多具体的内容呢,自己感兴趣多去搜索下吧,肯定还是有很多其他问题的,我这里就不铺开了讲了,下边咱们就说说,如何在Blog.Core里添加队列吧。

2

订阅发布相关配置案例

案例有很多,自己可以根据情况自定义。

那既然要讲东西,肯定不能随便放一个算法,肯定是需要一个小demo,一个应用场景,这样更有助于初学者去理解,之前考虑了很多,一直没有想好在BlogCore里边使用什么案例场景来说一说消息队列,最后实在是没办法,只能说日志了,万事不决就说日志,好像软件开发都是这么举例的。

这里说一下,假设我们自定义了一个日志记录的方法,就是在txt里写数据,其实我现在也是这么用的,平时肯定会一边查一边写,如果并发高一下,肯定就会出现死锁或者异常的出现,那我们就可以把写日志放到消息队列里,缓冲一下,然后在写一个订阅者,专门来“盯着”队列,一有消息传过来,就写到日志文件里,这样就能很好的实现相应的目的。如果不缓冲下,有时候日志可能高达几万条,瞬间爆炸。

那说了这个小场景,接下来就简单的模拟一下吧。

1、定义消息队列操作类与接口

既然要发布和订阅消息,肯定就需要有相应的操作方法,在上一篇文章中,我新建了一个RedisBasketRepository.cs的操作类,那我们还继续在这个类文件里写吧,注意,这个实现类和接口,已经注册到服务容器了,如果你第一次操作,可以参考文章开头上篇文章内容:

 /// <summary>/// 根据key获取RedisValue/// </summary>/// <typeparam name="T"></typeparam>/// <param name="redisKey"></param>/// <returns></returns>public async Task<RedisValue[]> ListRangeAsync(string redisKey){return await _database.ListRangeAsync(redisKey);}/// <summary>/// 在列表头部插入值。如果键不存在,先创建再插入值/// </summary>/// <param name="redisKey"></param>/// <param name="redisValue"></param>/// <returns></returns>public async Task<long> ListLeftPushAsync(string redisKey, string redisValue, int db = -1){return await _database.ListLeftPushAsync(redisKey, redisValue);}/// <summary>/// 在列表尾部插入值。如果键不存在,先创建再插入值/// </summary>/// <param name="redisKey"></param>/// <param name="redisValue"></param>/// <returns></returns>public async Task<long> ListRightPushAsync(string redisKey, string redisValue, int db = -1){return await _database.ListRightPushAsync(redisKey, redisValue);}/// <summary>/// 移除并返回存储在该键列表的第一个元素  反序列化/// </summary>/// <param name="redisKey"></param>/// <returns></returns>public async Task<T> ListLeftPopAsync<T>(string redisKey, int db = -1) where T : class{return JsonConvert.DeserializeObject<T>(await _database.ListLeftPopAsync(redisKey));}

我这里只是简单的Copy出来几个做例子,总的一共有12个,当然你也可以自定义增加或删除某些不必要的,核心的可以看出来,都是根据redisKey来操作的

Task<RedisValue[]> ListRangeAsync(string redisKey);Task<long> ListLeftPushAsync(string redisKey, string redisValue, int db = -1);Task<long> ListRightPushAsync(string redisKey, string redisValue, int db = -1);Task<long> ListRightPushAsync(string redisKey, IEnumerable<string> redisValue, int db = -1);Task<T> ListLeftPopAsync<T>(string redisKey, int db = -1) where T : class;Task<T> ListRightPopAsync<T>(string redisKey, int db = -1) where T : class;Task<string> ListLeftPopAsync(string redisKey, int db = -1);Task<string> ListRightPopAsync(string redisKey, int db = -1);Task<long> ListLengthAsync(string redisKey, int db = -1);Task<IEnumerable<string>> ListRangeAsync(string redisKey, int db = -1);Task<IEnumerable<string>> ListRangeAsync(string redisKey, int start, int stop, int db = -1);Task<long> ListDelRangeAsync(string redisKey, string redisValue, long type = 0, int db = -1);Task ListClearAsync(string redisKey, int db = -1);

2、如何发布消息与接收消息

上边定义好了相应的操作方法以后,就很简单了,我们来发布一条消息来试试:

 [HttpGet][AllowAnonymous]public async Task RedisMq(){var msg = "这里是一条日志";await _redisBasketRepository.ListLeftPushAsync(RedisMqKey.Loging, msg);}

就是这么简单,构造函数注入以后,直接调用相应的方法,就把消息msg推送到了队列里了,这里的redisKey,我用了常量定义,具体可操作Blog.Core源代码。

现在是发布消息特别简单,只需要一行接口,那如何去获取呢,在上边的获取方法中,我们定义的是:

Task<RedisValue[]> ListRangeAsync(string redisKey);

这个方法也是可以的,只不过我们需要对其进行转换,毕竟存的msg是字符串string类型的,但是这里的返回类型的RedisValue[],所以需要劈里啪啦转化一下。

但是这里有一个问题,就是如何去定时获取呢,也就是如何设计一个订阅者进行消费消息呢,这需要思考下,当然比较简单的就是while(true){},可能平时就是这么使用的,不过还是不是那么爽快,可以写一个组件来处理,简单快捷,正好,有一个大佬已经封装好了,我们可以直接拿来用,如果你有什么问题,可以给他提issue。

3、InitQ组件来订阅消息

在nuget中,可以直接安装组个组件:

<PackageReference Include="InitQ" Version="1.0.0.4" />

他的开源地址是:

https://github.com/wmowm/Initq

使用方法很简单,可以参考他的README里的介绍:

1、先添加服务

 /// <summary>
/// Redis 消息队列 启动服务
/// </summary>
public static class RedisInitMqSetup
{public static void AddRedisInitMqSetup(this IServiceCollection services){if (services == null) throw new ArgumentNullException(nameof(services));services.AddInitQ(m =>{//时间间隔m.SuspendTime = 5000;//redis服务器地址m.ConnectionString = "127.0.0.1:6379";//对应的订阅者类,需要new一个实例对象,当然你也可以传参,比如日志对象m.ListSubscribe = new List<IRedisSubscribe>() { new RedisSubscribe()};//显示日志m.ShowLog = false;});}
}

2、定义订阅者

 public class RedisSubscribe : IRedisSubscribe{[Subscribe(RedisMqKey.Loging)]private async Task SubRedisLoging(string msg){Console.WriteLine($"队列{RedisMqKey.Loging} 消费到/接受到 消息:{msg}");await Task.CompletedTask;}}

整体很简单,继承接口,然后添加上特性,这个特性里的参数,就是我们消息发布的时候的那个key,然后方法的参数,就是对应的消息msg,是不是很简单。

当然这里你可以传递一个日志的对象实例,这样就把日志信息分流到了队列里,然后队列走到这个订阅者里,由这里进行缓冲,然后把日志填充到日志文件,从而达到减峰的目的。

最终的效果可以看看:

好啦,今天的redis消息队列已经说完了,还是很简单的,其中重点还是那五种模式要自己好好了解下,然后整体过程自己把握把握,至于RabbitMQ,这个以后再说吧。

END

扫码关注

老张的哲学

更多精彩等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/307947.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

leetcode18. 四数之和(双指针)

一&#xff1a;题目 二&#xff1a;上码 class Solution { public:vector<vector<int>> fourSum(vector<int>& nums, int target) {vector<vector<int> >ans;vector<int> v;sort(nums.begin(),nums.end());for(int i 0; i < nums…

过去3个多月的1200个小时里,我收获了什么?| 2021年年中总结

&#x1f55b;序言 今年三月初&#xff0c;善后了上学期事情之后&#xff0c;我开始在想&#xff0c;我的未来规划。 身边的好朋友和同学都在筹划着自己的未来&#xff0c;考研的考研&#xff0c;考公的考公。父母和老师们也在劝我说考研&#xff0c;问我考不考研。 依稀记得…

WPF 消息框 TextBox 绑定新数据时让光标和滚动条跳到最下面

WPF 消息框 TextBox 绑定新数据时让光标和滚动条跳到最下面独立观察员 2020 年 9 月 3 日我们在使用 WPF 的 TextBox 作为消息展示框时&#xff0c;如果想在出现滚动条之后&#xff0c;新消息到来时还能够被看到&#xff0c;也就是说让滚动条始终在最下面&#xff0c;或者说光标…

leedcode344. 反转字符串

一:题目 二:上码 class Solution { public:void reverseString(vector<char>& s) {//双指针for(int i 0,j s.size() - 1; i < s.size()/2; i,j--) {swap(s[i],s[j]);}} };

组件库实战 | 用vue3+ts实现全局Header和列表数据渲染ColumnList

用vue3ts实现全局Header和列表数据渲染ColumnList&#x1f5bc;️序言&#x1f4fb;一、ColumnList数据渲染1、设计稿抢先知2、数据构思3、视图数据绑定4、数据传递5、挠头情况☎️二、GlobalHeader全局Header1、设计稿抢先看2、数据构思3、视图数据绑定4、数据传递&#x1f4f…

初识ABP vNext(8):ABP特征管理

点击上方蓝字"小黑在哪里"关注我吧定义特征应用特征用户数量社交登录前言上一篇提到了ABP功能管理&#xff08;特征管理&#xff09;&#xff0c;它来自ABP的FeatureManagement模块&#xff0c;ABP官方文档貌似还没有这个模块的相关说明&#xff0c;但是个人感觉这个…

.NET Core 中导入导出Excel

操作Excel是一个比较常见的业务场景&#xff0c;本篇将使用EPPlus简单演示一个导入导出的示例。EPPlus开源地址&#xff1a;https://github.com/EPPlusSoftware/EPPlus在项目中添加EPPlus组件Install-Package EPPlus导入先准备一个Excel文件&#xff0c;将其内容读取出来&#…

不会webpack的前端可能是捡来的,万字总结webpack的超入门核心知识

一文了解webpack入门核心知识&#x1f3a8;序言&#x1f4c5;一、webpack究竟是什么1、写在前面2、什么是模块打包工具&#xff1f;&#x1f4d0;二、如何用Webpack搭建环境1、安装node2、创建项目3、初始化项目4、安装webpack5、安装具体版本的webpack⚙️三、Webpack的配置文…

剑指 Offer 05. 替换空格(两种做法)

一:题目 二:上码 1:方法一 class Solution { public:string replaceSpace(string s) {string str "";for(int i 0; i < s.size(); i) {if(s[i] ){str "%20";}else{str s[i];}}return str;} };2:方法二&#xff08;双指针&#xff09; class So…

.NET Core 中生成验证码

在开发中&#xff0c;有时候生成验证码的场景目前还是存在的&#xff0c;本篇演示不依赖第三方组件&#xff0c;生成随机验证码图片。先添加验证码接口public interface ICaptcha {/// <summary>/// 生成随机验证码/// </summary>/// <param name"codeLeng…

10分钟手把手教你用Android手撸一个简易的个人记账App

用Android手撸一个简易的个人记账系统⛱️序言&#x1f4cb;一、系统结构设计Design1. 需求分析2. 数据库设计3. 界面设计4. 过程设计&#x1f4d8;二、编码阶段Coding1. 项目结构&#x1f5c2;️&#xff08;1&#xff09;文件目录&#xff08;2&#xff09;AndroidManifest.x…

像素级调整,高效转换——轻松提升你的图片处理体验!

探索更高级的图片处理体验&#xff0c;我们为你带来像素级调整与高效转换的完美结合&#xff01;借助我们的专业工具&#xff0c;轻松调整图片像素&#xff0c;让你在细节处展现无限创意&#xff0c;提升作品质感。 第一步&#xff0c;进入首助编辑高手主页面&#xff0c;可以看…

ABP VNext实践之搭建可用于生产的IdentityServer4

一、前言用了半年多的abp vnext&#xff0c;在开发的效果还是非常的好&#xff0c;可以说节省了很多时间&#xff0c;像事件总线、模块化开发、动态API进行远程调用、自动API控制器等等&#xff0c;一整套的规范&#xff0c;让开发人员更方便的集成&#xff0c;提升效率&#x…

151. 翻转字符串里的单词(思路+详解)

一:题目 二:上码 class Solution { public://利用双指针使除去多余的空格,定义快慢指针&#xff0c;最后满指针指向的位置就是除去冗余空格后的字符串//大小void removespacing(string& s){int slowIndex 0,fastIndex 0;//去掉字符串前面的字符while(s.size() > 0 &a…

一文了解树在前端中的应用,掌握数据结构中树的生命线

一文了解树在前端中的应用&#x1f3d5;️序言&#x1f332;一、树是什么&#xff1f;&#x1f334;二、深/广度优先遍历1、深度优先遍历&#xff08;1&#xff09;定义&#xff08;2&#xff09;口诀&#xff08;3&#xff09;代码实现2、广度优先遍历&#xff08;1&#xff0…

【追加功能】OFFICE插件管理工具重整后再上路,更好用易用。

现在使用OFFICE插件的群体越来越多&#xff0c;在8月份修复过的【OFFICE插件管理工具】&#xff0c;尝试将COM加载项的插件管理进行完善。但仍然有一小部分普通加载项的管理未能加到里面。特别是近期用户反馈到的EasyShu插件不能取消加载问题&#xff08;这个是一个bug&#xf…

剑指 Offer 58 - II. 左旋转字符串

一:题目 二:上码 class Solution { public:string reverseLeftWords(string s, int n) {string str1 s.substr(0,n);//(开始位置&#xff0c;个数)string str2 s.substr(n);return str2str1;} };

太平洋大西洋水流问题如何解决?一文了解图在前端中的应用

一文了解图在前端中的应用&#x1f3a7;序言&#x1f3a4;一、图是什么&#xff1f;1、定义2、举例&#x1f3b9;二、图的表示法1、邻接矩阵表示法2、邻接表表示法&#x1f3ba;三、图的常用操作1、图的深度优先遍历&#xff08;1&#xff09;定义&#xff08;2&#xff09;口诀…

统信发布UOS V20 进军个人市场 生态日益完善

日前&#xff0c;统信桌面操作系统V20个人版正式发布&#xff0c;统信UOS是国产操作系统中屈指可数的在民间拥有一批爱好者和发烧友的操作系统&#xff0c;本次统信发布UOS V20个人版&#xff0c;吹响了进军TO C市场的号角。根据官方宣传介绍&#xff0c;统信桌面操作系统V20个…

leetcode28. 实现 strStr(KMP详解)

一:题目 二&#xff1a;思路 三:上码 // class Solution { // public: // int strStr(string haystack, string needle) { // if (needle.size()0) // return 0; // if (needle.size() > haystack.size()) // return -1; // …