有一段时间没有写文章,techempower的测试规则评分竟然发生了变化,只能忘着补充一下占比权重最多的数据更新示例了和深入设计一下组件模块化加载的设计。但在不久前有用户问了一下组件是否支持redis的Stream功能,看了一样相关资料后把功能实现之;接下来就介绍一下如何用Beetlex.Redis来调用redis的Stream功能。
什么是Stream
是Redis5.0的Stream是一个新的强大的支持多播的可持久化的消息队列,它提供了消息添加,多组和多消费者一致性读取和ack确认等功能;更详细的介绍就不多说了可以通过网络找到更多详细描述。
创建Stream
组件通过RedisDB对象的GetStream访求来创建一个Stream访问对象,对象创建后就可以进行一系列的 XACK| XADD| XDEL| XGROUP| XLEN| XRANGE| XREAD| XREADGROUP| XREVRANGE|XTRIM等指令操作。创建代码如下:
RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream");
XADD
在介绍这个操作前先说一下Stream里存储的格式,默认Stream消息是K-V的格式,从基础指令上可以了解到这种结构
XADD mystream * sensor-id 1234 temperature 19.8
但这种格式操作起来并不友好,所以组件除了支持这种K-V的方式外,还支持以对象的方式进行Stream消息处理。接下来看一下插入对象的调用
RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream");
var id = await stream.Add(DataHelper.Defalut.Employees[0]);
id = await stream.Add(DataHelper.Defalut.Employees[1]);
id = await stream.Add(DataHelper.Defalut.Employees[2]);
var len = await stream.Len();
组件支持直接入插对象,其基础指令就是
XADD employees_stream * date employeejson
组件直接采用一个K-V的方式来存储对象,对于原则多个K-V的方式组件同样也支持,只是在构建Stream指定类型用Dictionary<string,string>即可;接下其他就不多说了直接上指令用例了。
XLEN
RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream");
var len = await stream.Len();
XDEL
RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream");
var items = await stream.Read(null, null, "0-0");
await stream.Del((from item in items select item.ID).ToArray());
XRANGE
RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream");
var items = await stream.Range();
items = await stream.RangeAll();
XREVRANGE
RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream");
var items = await stream.RevRange();
items = await stream.RevRangeAll();
XREAD
RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream");
var items = await stream.Read(0, null, "0-0");
items = await stream.Read();
Stream的消费组
前面介绍的指令感觉列表结构都能满足,其实Stream重要的功能是在组消费这一块,Redis可以针对Stream创建多个消费组和消费者,而消息会做一致性消费处理。
XGROUP
RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream");
var group = await stream.GetGroup("henry");
XREAD
RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream");
var group = await stream.GetGroup("g1");
var items = await group.Read("henry", "0");
实际XRead提供了是否等待和起始读已取参数
public async ValueTask<List<StreamDataItem<T>>> ReadWait(string consumer,int timeout=0public ValueTask<List<StreamDataItem<T>>> Read(string consumer,string start = null) public async ValueTask<List<StreamDataItem<T>>> Read(string consumer, int? block, int? count, string start = null)
一般情况下可以通过readwait来不停地消息新的消息
while(true)
{items = await group.ReadWait("henry");//处理消息foreach(var item in items){await item.Ack();}
}
XACK
RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream");
var group = await stream.GetGroup("g1");
var items = await group.Read("henry", "0");
foreach (var item in items)await item.Ack();
以上是BeetleX.Redis组件提供操作Stream的基础指令,实际上Stream还有一些和运维相关的指令,只是这些在实际业务上用不上所以就没有去实现了。