聆听自己的声音
如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯。
话说上回书我们说到了,Redis的使用修改《【BCVP更新】StackExchange.Redis的异步开发方式》,通过异步的时候,基本上会解决StackExRedis组件使用过程中,可能在并发的时候遇到的问题,而且该组件也是微软官方推荐的(参考微软微服务框架eShopOnContainers),如果一定要抬杠说不好用,其实是没必要的。那今天我们继续往下说,简单说下如何基于Redis实现消息队列。
目前在市面上比较主流的消息队列中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ等这几种。当然常见的还是基于RabbitMQ来实现的,Redis份额稍微小了一点,但是因为Redis的仓储、缓存等多个方面的好处,使得Redis也是很火。
1
什么是消息队列
这个其实我今天不打算重点讲,因为我详细每个人能看这篇文章,肯定都知道消息队列的相关内容,但是为了不那么突兀,我就从网上粘贴几块基本概念,了解一二:
基本概念:
消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。
消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。
最终可以实现解耦的目的。
下面通过一个简单的架构模型来解释:
Producer:消息生产者,负责产生和发送消息到Broker。
Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个Queue。
Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理。
有哪些优缺点:
从上边的定义中,我们可以看出来,优点主要是三块:
异步、流量削峰与流控、解耦。
这三个优点在高并发等三高场景还是很有必要的,甚至说是十分必要的。
典型的广播模式,一个消息可以发布到多个消费者;
消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息;
比如我们某宝下订单,或某6抢车票,那都是放到队列里缓冲的,要是都用服务端等待,可能早就崩了,当然实际上比这个复杂的多。
而且,通过订阅发布的模式,异步执行,这样就会大大缓解时间压力。
但是,随之而来的弊端也是有的:
比如为了异步,就是接收者必须轮询消息队列,才能收到最近的消息。然后还有就是不能达到实时性,说白了就是用空间换时间,从而降低瓶颈。
消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回。不能保证每个消费者接收的时间是一致的。若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。
五种常见模式
简单模式Hello World
功能:一个生产者P发送消息到队列Q,一个消费者C接收
工作队列模式Work Queue
功能:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列
发布/订阅模式Publish/Subscribe
功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者
路由模式Routing
说明:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key
通配符(主题)模式Topic
说明:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;
更多具体的内容呢,自己感兴趣多去搜索下吧,肯定还是有很多其他问题的,我这里就不铺开了讲了,下边咱们就说说,如何在Blog.Core里添加队列吧。
2
订阅发布相关配置案例
案例有很多,自己可以根据情况自定义。
那既然要讲东西,肯定不能随便放一个算法,肯定是需要一个小demo,一个应用场景,这样更有助于初学者去理解,之前考虑了很多,一直没有想好在BlogCore里边使用什么案例场景来说一说消息队列,最后实在是没办法,只能说日志了,万事不决就说日志,好像软件开发都是这么举例的。
这里说一下,假设我们自定义了一个日志记录的方法,就是在txt里写数据,其实我现在也是这么用的,平时肯定会一边查一边写,如果并发高一下,肯定就会出现死锁或者异常的出现,那我们就可以把写日志放到消息队列里,缓冲一下,然后在写一个订阅者,专门来“盯着”队列,一有消息传过来,就写到日志文件里,这样就能很好的实现相应的目的。如果不缓冲下,有时候日志可能高达几万条,瞬间爆炸。
那说了这个小场景,接下来就简单的模拟一下吧。
1、定义消息队列操作类与接口
既然要发布和订阅消息,肯定就需要有相应的操作方法,在上一篇文章中,我新建了一个RedisBasketRepository.cs的操作类,那我们还继续在这个类文件里写吧,注意,这个实现类和接口,已经注册到服务容器了,如果你第一次操作,可以参考文章开头上篇文章内容:
/// <summary>/// 根据key获取RedisValue/// </summary>/// <typeparam name="T"></typeparam>/// <param name="redisKey"></param>/// <returns></returns>public async Task<RedisValue[]> ListRangeAsync(string redisKey){return await _database.ListRangeAsync(redisKey);}/// <summary>/// 在列表头部插入值。如果键不存在,先创建再插入值/// </summary>/// <param name="redisKey"></param>/// <param name="redisValue"></param>/// <returns></returns>public async Task<long> ListLeftPushAsync(string redisKey, string redisValue, int db = -1){return await _database.ListLeftPushAsync(redisKey, redisValue);}/// <summary>/// 在列表尾部插入值。如果键不存在,先创建再插入值/// </summary>/// <param name="redisKey"></param>/// <param name="redisValue"></param>/// <returns></returns>public async Task<long> ListRightPushAsync(string redisKey, string redisValue, int db = -1){return await _database.ListRightPushAsync(redisKey, redisValue);}/// <summary>/// 移除并返回存储在该键列表的第一个元素 反序列化/// </summary>/// <param name="redisKey"></param>/// <returns></returns>public async Task<T> ListLeftPopAsync<T>(string redisKey, int db = -1) where T : class{return JsonConvert.DeserializeObject<T>(await _database.ListLeftPopAsync(redisKey));}
我这里只是简单的Copy出来几个做例子,总的一共有12个,当然你也可以自定义增加或删除某些不必要的,核心的可以看出来,都是根据redisKey来操作的:
Task<RedisValue[]> ListRangeAsync(string redisKey);Task<long> ListLeftPushAsync(string redisKey, string redisValue, int db = -1);Task<long> ListRightPushAsync(string redisKey, string redisValue, int db = -1);Task<long> ListRightPushAsync(string redisKey, IEnumerable<string> redisValue, int db = -1);Task<T> ListLeftPopAsync<T>(string redisKey, int db = -1) where T : class;Task<T> ListRightPopAsync<T>(string redisKey, int db = -1) where T : class;Task<string> ListLeftPopAsync(string redisKey, int db = -1);Task<string> ListRightPopAsync(string redisKey, int db = -1);Task<long> ListLengthAsync(string redisKey, int db = -1);Task<IEnumerable<string>> ListRangeAsync(string redisKey, int db = -1);Task<IEnumerable<string>> ListRangeAsync(string redisKey, int start, int stop, int db = -1);Task<long> ListDelRangeAsync(string redisKey, string redisValue, long type = 0, int db = -1);Task ListClearAsync(string redisKey, int db = -1);
2、如何发布消息与接收消息
上边定义好了相应的操作方法以后,就很简单了,我们来发布一条消息来试试:
[HttpGet][AllowAnonymous]public async Task RedisMq(){var msg = "这里是一条日志";await _redisBasketRepository.ListLeftPushAsync(RedisMqKey.Loging, msg);}
就是这么简单,构造函数注入以后,直接调用相应的方法,就把消息msg推送到了队列里了,这里的redisKey,我用了常量定义,具体可操作Blog.Core源代码。
现在是发布消息特别简单,只需要一行接口,那如何去获取呢,在上边的获取方法中,我们定义的是:
Task<RedisValue[]> ListRangeAsync(string redisKey);
这个方法也是可以的,只不过我们需要对其进行转换,毕竟存的msg是字符串string类型的,但是这里的返回类型的RedisValue[],所以需要劈里啪啦转化一下。
但是这里有一个问题,就是如何去定时获取呢,也就是如何设计一个订阅者进行消费消息呢,这需要思考下,当然比较简单的就是while(true){},可能平时就是这么使用的,不过还是不是那么爽快,可以写一个组件来处理,简单快捷,正好,有一个大佬已经封装好了,我们可以直接拿来用,如果你有什么问题,可以给他提issue。
3、InitQ组件来订阅消息
在nuget中,可以直接安装组个组件:
<PackageReference Include="InitQ" Version="1.0.0.4" />
他的开源地址是:
https://github.com/wmowm/Initq
使用方法很简单,可以参考他的README里的介绍:
1、先添加服务
/// <summary>
/// Redis 消息队列 启动服务
/// </summary>
public static class RedisInitMqSetup
{public static void AddRedisInitMqSetup(this IServiceCollection services){if (services == null) throw new ArgumentNullException(nameof(services));services.AddInitQ(m =>{//时间间隔m.SuspendTime = 5000;//redis服务器地址m.ConnectionString = "127.0.0.1:6379";//对应的订阅者类,需要new一个实例对象,当然你也可以传参,比如日志对象m.ListSubscribe = new List<IRedisSubscribe>() { new RedisSubscribe()};//显示日志m.ShowLog = false;});}
}
2、定义订阅者
public class RedisSubscribe : IRedisSubscribe{[Subscribe(RedisMqKey.Loging)]private async Task SubRedisLoging(string msg){Console.WriteLine($"队列{RedisMqKey.Loging} 消费到/接受到 消息:{msg}");await Task.CompletedTask;}}
整体很简单,继承接口,然后添加上特性,这个特性里的参数,就是我们消息发布的时候的那个key,然后方法的参数,就是对应的消息msg,是不是很简单。
当然这里你可以传递一个日志的对象实例,这样就把日志信息分流到了队列里,然后队列走到这个订阅者里,由这里进行缓冲,然后把日志填充到日志文件,从而达到减峰的目的。
最终的效果可以看看:
好啦,今天的redis消息队列已经说完了,还是很简单的,其中重点还是那五种模式要自己好好了解下,然后整体过程自己把握把握,至于RabbitMQ,这个以后再说吧。
END
扫码关注
老张的哲学
更多精彩等着你