reidis为了抢占市场份额,推出了自己的消息队列,Stream流,
常用操作如下:
xadd name id值 key value key1 value1...:若不存在为name的stream流,则创建一个新的名为name的stream流。这里id相当于数据库中的主键,如果我们不想自己定义就可以设置为*,表示自动创建,(有点像数据库中的自增主键)。
这个主键是根据当前时间戳+个数来生成的,保证了唯一性。
查询
xrange name start end
通过命令查询,可通过- + 来表示查询所有
想倒序也可以使用
xlen name
获取流中信息个数
xdel name id
通过id来删除某个消息
xtrim 通过指定条件删除元素
xtrim name maxlen 个数
保留最新个数,多余的删除
在这个样例中我有三个元素,但我只保留一个最长的,因为id是按时间戳来排序,所以最新及最长的,旧的被删除。
xread count 个数 streams name 00 : 当前00表示最小的,表示从最小开始读
xread count 个数 streams name $: $表示最大的开始读,所以如果非阻塞的话肯定为空
当然我们可以用阻塞队列监控
xread count 个数 block 数字 streams name $
当我们使用阻塞进行监听时,可以显示新出的时间是多少,及再登一个虚拟机,对该流进行插入。
当有消息后就会有不同消费者来读,通过一个个组来维护
创建用户组
xgroup create mystream group1 0(或$): 创见消息mystream的用户组group1,0表示从小到大开始。
一个用户组肯定会有成员来读,一个读过则表示整个组都读过。
xreadgroup group 组名 成员名 streams stream名 > :表示一个读所有内容。
当然也可以限制成员读取个数
count限制读的个数
xpending stream名 组名 : 显示成员读的情况
想看某个成员读的有哪些也可以
xpending mystream名 客户组 - + 个数 个人名
当然上面都是已读未确认,可以进行确认,
xack stream名 组名 stream里的某id
这样就表示该组读取并确认了该消息,该消息并不在已读未确认中。
不过不如kafka等中间件!!!