spring-boot redis stream消息队列demo-及死信简单处理

Redis stream 是 Redis 5 引入的一种新的数据结构,它是一个高性能、高可靠性的消息队列,主要用于异步消息处理和流式数据处理。在此之前,想要使用 Redis 实现消息队列,通常可以使用例如:列表,有序集合、发布与订阅 3 种数据结构。但是 stream 相比它们具有以下的优势:

  • 支持范围查找:内置的索引功能,可以通过索引来对消息进行范围查找
  • 支持阻塞操作:避免低效的反复轮询查找消息
  • 支持 ACK:可以通过确认机制来告知已经成功处理了消息,保证可靠性
  • 支持多个消费者:多个消费者可以同时消费同一个流,Redis 会确保每个消费者都可以独立地消费流中的消息

情况

当前项目,是在window server 上 部署 rocketmq (ps 单体)使用 消息队列,使用三方exe实现 注册到 window 服务中(实现开机自启-自行搜索实现吧)。
在这里插入图片描述
在这里插入图片描述

问题

当前发现一个问题,window 被关机(主动、断电等),mq的broker无法启动,原因是 delayOffset.json文件损坏造成,文件内容变成如下图。
在这里插入图片描述
修改为,重新启动没问题了
在这里插入图片描述
每次重新都需要处理,虽然可以考虑使用脚本 处理,但是徒增成本。就想着 使用redis实现 进行替换(项目规模较小)。redis-stream 需要将 版本升级为 5(当前是4)。
Redis for Windows
redis下载 直接替换就行。

死信问题

网上看到 处理的方式有好几种。定时调度,处理pending信息(直接ack、重新消费、转移消费组等);人工运维处理 等。看了很多,结合 spring 提供的函数。
StringRedisTemplate.redisTemplate.opsForStream()返回的接口类StreamOperations 参考spring文档,可以用的方法有 acknowledgeadddeletecreateGroupdeleteConsumerdestroyGroupconsumersgroupsinfopendingsizerangereadreverseRangetrim等。
没有包含命令:转移消费 xClaim,实际在RedisStreamCommands接口中,通过代码也能解决;

redisTemplate.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands().xClaim("stream".getBytes(), "group", "consumer", Duration.ofSeconds(10), RecordId.of("streamId")))

也没有包含命令:设置消费组的起始消息 ID xgroupSetid,实际在RedisStreamAsyncCommands接口中,代码在比较底层,不能拿来就用。

RedisFuture<String> xgroupSetid(StreamOffset<K> streamOffset, K group);
当前方式

当前,实现的方式。在项目 重启 或 定时调度(暂无代码demo),通过 pending 获取消息 streamId,先复制消息add,在确认旧消息acknowledge,最后删除旧消息delete

ps:自己感觉还行吧(迷之自信O(∩_∩)O哈哈~),不要拿来就用啊(⊙o⊙)…

redis桌面管理

下载redis可视化管理工具:AnotherRedisDesktopManager
在这里插入图片描述

参考

理解 Redis 新特性:Stream
springboot + redis stream做轻量级消息队列
Redis Stream实现消息队列
redis Stream消息队列 redision redis stream消息队列pending
使用redis流和spring数据获取挂起的消息
在SpringBoot中使用RedisTemplate重新消费Redis Stream中未ACK的消息

说明

redis-stream命令

  • XADD:向流中添加新的消息。
  • XREAD:从流中读取消息。
  • XREADGROUP:从消费组中读取消息。
  • XRANGE:根据消息 ID 范围读取流中的消息。
  • XREVRANGE:与 XRANGE 类似,但以相反顺序返回结果。
  • XDEL:从流中删除消息。
  • XTRIM:根据 MAXLEN 参数修剪流的长度。
  • XLEN:获取流的长度。
  • XGROUP:管理消费组,包括创建、删除和修改。
  • XACK:确认消费组中的消息已被处理。
  • XPENDING:查询消费组中挂起(未确认)的消息。
  • XCLAIM:将挂起的消息从一个消费者转移到另一个消费者。
  • XINFO:获取流、消费组或消费者的详细信息。

pom

spring-boot 加 redis依赖,简单项目,测试用

<parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.7.0</version><relativePath/>
</parent>
<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.7.0</spring-boot.version>
</properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency>
</dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

配置

server:port: 8888servlet:context-path: /
spring:redis:msg:listener: falsehost: 127.0.0.1port: 6379database: 1client-type: lettucelettuce:pool:max-active: 8

代码结构

引入spring boot的 StringRedisTemplate

    private static StringRedisTemplate redisTemplate;@Autowiredpublic void setRedisTemplate(StringRedisTemplate redisTemplate) {RedisStreamConfig.redisTemplate = redisTemplate;}

声明变量。redis 的 key键名称,消费者 组名,消费者 客户端名。

final static String STREAM = "tsp", GROUP = "tsp-g-1", CONSUMER = "tsp-c-1";

生产者-发送消息

这块比较简单,直接调用就行

/** 生产者-发送消息 */public static void push(String msg){// 创建消息记录, 以及指定streamStringRecord record = StreamRecords.string(Collections.singletonMap("data", msg)).withStreamKey(STREAM);// 将消息添加至消息队列中 XADD stream [MAXLEN len] id field value [field value ...]redisTemplate.opsForStream().add(record);log.info("redis-消息队列-stream, {} ,send msg: {}", STREAM,msg);}

消费者

消费者-监听类,实际的业务逻辑处理内容。消息队列 创建 会有测试消息,需要跳过。实际业务逻辑需要考虑,挂起消息 重新消费情况(解决 死信问题的简单方法)

/** 消费者 监听 */public static class TspStreamListener implements StreamListener<String, ObjectRecord<String,String>> {@Overridepublic void onMessage(ObjectRecord<String, String> message) {RecordId messageId = message.getId();// 消息的key和valueString string = message.getValue();if("T".equals(string)){log.info("消费者-监听>>>测试消息-ack. msgId={}, stream={}, body={}", messageId, message.getStream(), string);redisTemplate.opsForStream().acknowledge(GROUP, message);return;}log.info("消费者-监听>>>get msg. msgId={}, stream={}, body={}", messageId, message.getStream(), string);//业务逻辑,需要考虑 被挂起的消息 重新消费情况try {log.info("消费者-监听>>>睡眠10s,模拟耗时逻辑-start");TimeUnit.SECONDS.sleep(10);log.info("消费者-监听>>>睡眠10s,模拟耗时逻辑-end");} catch (InterruptedException e) {e.printStackTrace();}log.info("消费者-监听>>>手动确认消息");redisTemplate.opsForStream().acknowledge(GROUP, message);log.info("消费者-监听>>>end");}}

消费者-注册,关闭自动ack,重连自动消费上次处理(已经在pending队列里面了)的下一个消息,拉取消息超时时间50s,每批数量1,使用默认线程池,传递数据类型 String,异常处理 打印错误日志。最后 start 启动消费。

    @Bean@ConditionalOnProperty(name = "spring.redis.msg.listener",havingValue = "true")public StreamMessageListenerContainer<String, ObjectRecord<String, String>> tspConsumerListener(RedisConnectionFactory factory){//spring-data-redis 2.3.1.RELEASE及更高版本中,createGroup如果不存在,则会自动使用创建流return streamContainer(StreamOffset.create(STREAM, ReadOffset.lastConsumed()),Consumer.from(GROUP,CONSUMER),factory,new TspStreamListener());}private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(StreamOffset<String> offset,Consumer consumer, RedisConnectionFactory factory, StreamListener<String, ObjectRecord<String, String>> listener) {// pollTimeout 拉取消息超时时间,targetType 传递的数据类型, executor 线程池StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer.create(factory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(50)).batchSize(1).targetType(String.class).build());//指定消费最新的消息try {//创建消费者,接收上次处理未ACK消费的消息,指定消费者对象,autoAcknowledge 关闭自动ack确认container.register(StreamMessageListenerContainer.StreamReadRequest.builder(offset).errorHandler((error) -> log.error(error.getMessage())).cancelOnError(e -> false).consumer(consumer).autoAcknowledge(false).build(), listener);} catch (Exception e) {log.error(e.getMessage());}container.start();return container;}

预处理

类加载完毕,执行方法。预处理操作,可以使用 spring 启动后处理类实现。本demo就先这样了。

本方法 是为了 解决,项目启动报错(无消息队列、无消费组)和程序重启死信问题(꒦_꒦)

    @PostConstructprivate void start(){}

如果redis 没有 对应key 的 stream 消息队列,消费者启动后会频繁报错。如下代码会在 项目启动 发送测试消息,创建 消息队列,并限制 消息队列 长度(限制内存消耗)。

ps:我好菜呀,不知道spring是否允许消费者 自动创建 消息队列o(╥﹏╥)o

// 生产者 创建 消息队列,防止启动 消费者 报错if(!(Boolean.TRUE.equals(redisTemplate.hasKey(STREAM)))){log.info("没有key,测试发送(创建key)");push("T");//限制 消息队列 容量 XTRIM stream MAXLEN lenredisTemplate.opsForStream().trim(STREAM,1000L);}

判断消息队列,是否存在消费组。没有创建消费组

// 创建 消息队列-消费者if (redisTemplate.opsForStream().groups(STREAM).isEmpty()) {log.info("redis-消息队列-stream,createGroup,{} {}",STREAM,GROUP);//XGROUP CREATE stream group [id|$|0] [MKSTREAM],使用 $ 表示仅消费新消息,或者使用 0 表示消费流中的所有消息redisTemplate.opsForStream().createGroup(STREAM,GROUP);}

死信处理代码

private void handlerPending(String key,String group){//判断是否存在挂起的消息 XPENDING stream group [start stop count] [consumer]PendingMessagesSummary pending = redisTemplate.opsForStream().pending(key, group);long size;if(pending == null || (size = pending.getTotalPendingMessages()) <= 0 ) {log.debug("redis-消息队列,{},{},暂无挂起消息pending", key, group);return;}//---------------------String minId = pending.minMessageId(),maxId = pending.maxMessageId(),id;// 从挂起消息开始处理 [1],minId : 1706178903044-0,maxId : 1706178903044-0log.info("redis-消息队列,{},{},pending-挂起消息[{}],minId : {},maxId : {}",key,group,size,minId,maxId);// - + 4 ---------------------获取挂起所有信息 streamIdPendingMessages msgIds = redisTemplate.opsForStream().pending(key, group, Range.closed("-", "+"), size);log.info("redis-消息队列,{},{},pending-挂起消息,{}",key,group,msgIds);List<MapRecord<String, Object, Object>> list;// ---------------------循环处理,可以考虑 异步执行for (PendingMessage msgId : msgIds) {id = msgId.getId().getValue();//PendingMessage{id=1706178903044-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=66775, totalDeliveryCount=1}log.info("redis-消息队列,{},{},pending-挂起消息>>,{}",key,group,msgId);//XRANGE key start end [COUNT count]list = redisTemplate.opsForStream().range(key, Range.just(msgId.getIdAsString()));log.info("redis-消息队列,{},{},range-挂起消息>> {},{}",key,group,id,list);if(list == null || list.isEmpty()){continue;}// 开始 结束 id相同,只返回 一个消息结果MapRecord<String, Object, Object> record = list.get(0);//MapBackedRecord{recordId=1706233950802-0, kvMap={data=401}}if("1706233940011-0".equals(id)){MapRecord<String, Object, Object> copy = record.withId(RecordId.autoGenerate());log.info("redis-消息队列,{},{},range-挂起消息>0>add+++++copy {},{}",key,group,copy.getStream(),copy);//XADD stream-name id field value [field value]RecordId add = redisTemplate.opsForStream().add(copy);log.info("redis-消息队列,{},{},range-挂起消息>0>add+++++copy {},{}",key,group,add,copy);log.info("redis-消息队列,{},{},range-挂起消息>0>ack^^^^^old {},{}",key,group,record.getStream(),record);//XACK stream group id [id id ...]Long siz = redisTemplate.opsForStream().acknowledge(GROUP, record);log.info("redis-消息队列,{},{},range-挂起消息>0>ack^^^^^old [{}],{}",key,group,siz,record);log.info("redis-消息队列,{},{},range-挂起消息>0>del-----old {},{}",key,group,record.getStream(),record);//XDEL key ID [ID ...]siz = redisTemplate.opsForStream().delete(record);log.info("redis-消息队列,{},{},range-挂起消息>0>del-----old [{}],{}",key,group,siz,record);}else{log.info("redis-消息队列,{},{},range-挂起消息>0> {},{},{}",key,group,record.getStream(),id,record);}}log.info("redis-消息队列,{},{},pending-挂起消息-end",key,group);}

汇总

代码

package demo;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;/*** @author z.y.l* @version v1.0* @date 2024/1/25*/
@Configuration
@Import (RedisAutoConfiguration.class)
public class RedisStreamConfig {private static final Logger log = LoggerFactory.getLogger(RedisStreamConfig.class);final static String STREAM = "tsp", GROUP = "tsp-g-1", CONSUMER = "tsp-c-1";private static StringRedisTemplate redisTemplate;@Autowiredpublic void setRedisTemplate(StringRedisTemplate redisTemplate) {RedisStreamConfig.redisTemplate = redisTemplate;}/** 生产者-发送消息 */public static void push(String msg){// 创建消息记录, 以及指定streamStringRecord record = StreamRecords.string(Collections.singletonMap("data", msg)).withStreamKey(STREAM);// 将消息添加至消息队列中 XADD stream [MAXLEN len] id field value [field value ...]redisTemplate.opsForStream().add(record);log.info("redis-消息队列-stream, {} ,send msg: {}", STREAM,msg);}/** 项目启动 预处理,无消息队列(消费者-启动报错)、限制容量、手动创建消费组、处理异常挂起的消息 */@PostConstructprivate void start(){// 生产者 创建 消息队列,防止启动 消费者 报错if(!(Boolean.TRUE.equals(redisTemplate.hasKey(STREAM)))){log.info("没有key,测试发送(创建key)");push("T");//限制 消息队列 容量 XTRIM stream MAXLEN lenredisTemplate.opsForStream().trim(STREAM,1000L);}// 创建 消息队列-消费者if (redisTemplate.opsForStream().groups(STREAM).isEmpty()) {log.info("redis-消息队列-stream,createGroup,{} {}",STREAM,GROUP);//XGROUP CREATE stream group [id|$|0] [MKSTREAM],使用 $ 表示仅消费新消息,或者使用 0 表示消费流中的所有消息redisTemplate.opsForStream().createGroup(STREAM,GROUP);}else{// 处理 挂起消息,也可以 使用调度 定时处理// 当前 挂起消息处理方式:复制新消息-ack旧消息-删除旧消息// 也可以使用 转移 消费组handlerPending(STREAM,GROUP);}}private void handlerPending(String key,String group){//判断是否存在挂起的消息 XPENDING stream group [start stop count] [consumer]PendingMessagesSummary pending = redisTemplate.opsForStream().pending(key, group);long size;if(pending == null || (size = pending.getTotalPendingMessages()) <= 0 ) {log.debug("redis-消息队列,{},{},暂无挂起消息pending", key, group);return;}String minId = pending.minMessageId(),maxId = pending.maxMessageId(),id;// 从挂起消息开始处理 [1],minId : 1706178903044-0,maxId : 1706178903044-0log.info("redis-消息队列,{},{},pending-挂起消息[{}],minId : {},maxId : {}",key,group,size,minId,maxId);// - + 4 所有PendingMessages msgIds = redisTemplate.opsForStream().pending(key, group, Range.closed("-", "+"), size);log.info("redis-消息队列,{},{},pending-挂起消息,{}",key,group,msgIds);List<MapRecord<String, Object, Object>> list;for (PendingMessage msgId : msgIds) {id = msgId.getId().getValue();//PendingMessage{id=1706178903044-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=66775, totalDeliveryCount=1}log.info("redis-消息队列,{},{},pending-挂起消息>>,{}",key,group,msgId);//XRANGE key start end [COUNT count]list = redisTemplate.opsForStream().range(key, Range.just(msgId.getIdAsString()));log.info("redis-消息队列,{},{},range-挂起消息>> {},{}",key,group,id,list);if(list == null || list.isEmpty()){continue;}// 开始 结束 id相同,只返回 一个消息结果MapRecord<String, Object, Object> record = list.get(0);//MapBackedRecord{recordId=1706233950802-0, kvMap={data=401}}if("1706233940011-0".equals(id)){MapRecord<String, Object, Object> copy = record.withId(RecordId.autoGenerate());log.info("redis-消息队列,{},{},range-挂起消息>0>add+++++copy {},{}",key,group,copy.getStream(),copy);//XADD stream-name id field value [field value]RecordId add = redisTemplate.opsForStream().add(copy);log.info("redis-消息队列,{},{},range-挂起消息>0>add+++++copy {},{}",key,group,add,copy);log.info("redis-消息队列,{},{},range-挂起消息>0>ack^^^^^old {},{}",key,group,record.getStream(),record);//XACK stream group id [id id ...]Long siz = redisTemplate.opsForStream().acknowledge(GROUP, record);log.info("redis-消息队列,{},{},range-挂起消息>0>ack^^^^^old [{}],{}",key,group,siz,record);log.info("redis-消息队列,{},{},range-挂起消息>0>del-----old {},{}",key,group,record.getStream(),record);//XDEL key ID [ID ...]siz = redisTemplate.opsForStream().delete(record);log.info("redis-消息队列,{},{},range-挂起消息>0>del-----old [{}],{}",key,group,siz,record);}else{log.info("redis-消息队列,{},{},range-挂起消息>0> {},{},{}",key,group,record.getStream(),id,record);}}log.info("redis-消息队列,{},{},pending-挂起消息-end",key,group);}@Bean@ConditionalOnProperty(name = "spring.redis.msg.listener",havingValue = "true")public StreamMessageListenerContainer<String, ObjectRecord<String, String>> tspConsumerListener(RedisConnectionFactory factory){//spring-data-redis 2.3.1.RELEASE及更高版本中,createGroup如果不存在,则会自动使用创建流return streamContainer(StreamOffset.create(STREAM, ReadOffset.lastConsumed()),Consumer.from(GROUP,CONSUMER),factory,new TspStreamListener());}private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(StreamOffset<String> offset,Consumer consumer, RedisConnectionFactory factory, StreamListener<String, ObjectRecord<String, String>> listener) {// pollTimeout 拉取消息超时时间,targetType 传递的数据类型, executor 线程池StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer.create(factory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(50)).batchSize(1).targetType(String.class).build());//指定消费最新的消息try {//创建消费者,接收上次处理未ACK消费的消息,指定消费者对象,autoAcknowledge 关闭自动ack确认container.register(StreamMessageListenerContainer.StreamReadRequest.builder(offset).errorHandler((error) -> log.error(error.getMessage())).cancelOnError(e -> false).consumer(consumer).autoAcknowledge(false).build(), listener);} catch (Exception e) {log.error(e.getMessage());}container.start();return container;}/** 消费者 监听 */public static class TspStreamListener implements StreamListener<String, ObjectRecord<String,String>> {@Overridepublic void onMessage(ObjectRecord<String, String> message) {RecordId messageId = message.getId();// 消息的key和valueString string = message.getValue();if("T".equals(string)){log.info("消费者-监听>>>测试消息-ack. msgId={}, stream={}, body={}", messageId, message.getStream(), string);redisTemplate.opsForStream().acknowledge(GROUP, message);return;}log.info("消费者-监听>>>get msg. msgId={}, stream={}, body={}", messageId, message.getStream(), string);//业务逻辑,需要考虑 被挂起的消息 重新消费情况try {log.info("消费者-监听>>>睡眠10s,模拟耗时逻辑-start");TimeUnit.SECONDS.sleep(10);log.info("消费者-监听>>>睡眠10s,模拟耗时逻辑-end");} catch (InterruptedException e) {e.printStackTrace();}log.info("消费者-监听>>>手动确认消息");redisTemplate.opsForStream().acknowledge(GROUP, message);log.info("消费者-监听>>>end");}}
}

redis

在这里插入图片描述

日志

2024-01-26 10:33:54.566 - [main] INFO  o.s.b.w.s.c.ServletWebServerApplicationContext.prepareWebApplicationContext(292) - Root WebApplicationContext: initialization completed in 1162 ms
2024-01-26 10:33:58.388 - [main] INFO  demo.RedisStreamConfig.handlerPending(85) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息[3],minId : 1706178903044-0,maxId : 1706233950802-0
2024-01-26 10:33:58.396 - [main] INFO  demo.RedisStreamConfig.handlerPending(88) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息,PendingMessages{groupName='tsp-g-1', range=[--+], pendingMessages=[PendingMessage{id=1706178903044-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=57524430, totalDeliveryCount=1}, PendingMessage{id=1706233940011-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=2477103, totalDeliveryCount=1}, PendingMessage{id=1706233950802-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=1968963, totalDeliveryCount=1}]}
2024-01-26 10:33:58.397 - [main] INFO  demo.RedisStreamConfig.handlerPending(93) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息>>,PendingMessage{id=1706178903044-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=57524430, totalDeliveryCount=1}
2024-01-26 10:33:58.404 - [main] INFO  demo.RedisStreamConfig.handlerPending(95) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>> 1706178903044-0,[]
2024-01-26 10:33:58.405 - [main] INFO  demo.RedisStreamConfig.handlerPending(93) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息>>,PendingMessage{id=1706233940011-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=2477103, totalDeliveryCount=1}
2024-01-26 10:33:58.409 - [main] INFO  demo.RedisStreamConfig.handlerPending(95) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>> 1706233940011-0,[MapBackedRecord{recordId=1706233940011-0, kvMap={data=303}}]
2024-01-26 10:33:58.409 - [main] INFO  demo.RedisStreamConfig.handlerPending(104) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>add+++++copy tsp,MapBackedRecord{recordId=*, kvMap={data=303}}
2024-01-26 10:33:58.413 - [main] INFO  demo.RedisStreamConfig.handlerPending(106) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>add+++++copy 1706236438413-0,MapBackedRecord{recordId=*, kvMap={data=303}}
2024-01-26 10:33:58.413 - [main] INFO  demo.RedisStreamConfig.handlerPending(107) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>ack^^^^^old tsp,MapBackedRecord{recordId=1706233940011-0, kvMap={data=303}}
2024-01-26 10:33:58.418 - [main] INFO  demo.RedisStreamConfig.handlerPending(109) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>ack^^^^^old [1],MapBackedRecord{recordId=1706233940011-0, kvMap={data=303}}
2024-01-26 10:33:58.419 - [main] INFO  demo.RedisStreamConfig.handlerPending(110) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>del-----old tsp,MapBackedRecord{recordId=1706233940011-0, kvMap={data=303}}
2024-01-26 10:33:58.420 - [main] INFO  demo.RedisStreamConfig.handlerPending(112) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>del-----old [1],MapBackedRecord{recordId=1706233940011-0, kvMap={data=303}}
2024-01-26 10:33:58.421 - [main] INFO  demo.RedisStreamConfig.handlerPending(93) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息>>,PendingMessage{id=1706233950802-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=1968963, totalDeliveryCount=1}
2024-01-26 10:33:58.422 - [main] INFO  demo.RedisStreamConfig.handlerPending(95) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>> 1706233950802-0,[MapBackedRecord{recordId=1706233950802-0, kvMap={data=401}}]
2024-01-26 10:33:58.422 - [main] INFO  demo.RedisStreamConfig.handlerPending(114) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0> tsp,1706233950802-0,MapBackedRecord{recordId=1706233950802-0, kvMap={data=401}}
2024-01-26 10:33:58.422 - [main] INFO  demo.RedisStreamConfig.handlerPending(117) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息-end

仅供参考-请结合实际情况使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/650798.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】istream类型对象转换为逻辑条件判断值

前言 大家好吖&#xff0c;欢迎来到 YY 滴 系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的《Linux》专…

LiveGBS流媒体平台GB/T28181常见问题-如何快速查看推流上来的摄像头并停止摄像头推流?

LiveGBS流媒体平台GB/T28181常见问题-如何快速查看推流上来的摄像头并停止摄像头推流&#xff1f; 1、负载信息2、负载信息说明3、会话列表查看3.1、会话列表 4、停止会话5、搭建GB28181视频直播平台 1、负载信息 实时展示直播、回放、播放、录像、H265、级联等使用数目 2、负…

Python算法题集_接雨水

本文为Python算法题集之一的代码示例 题目42&#xff1a;接雨水 说明&#xff1a;给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水 示例 1&#xff1a; 输入&#xff1a;height [0,1,0,2,1,0,1,3,2,1,2,1]…

ElasticSearch7.7.1集群搭建

前言 Elasticsearch&#xff08;ES&#xff09;是一个基于Apache Lucene的分布式、高扩展、近实时的搜索引擎&#xff0c;主要用于海量数据快速存储、实时检索、高效分析的场景。通过简单易用的RESTful API&#xff0c;Elasticsearch隐藏了Lucene的复杂性&#xff0c;使得全文搜…

数论Leetcode204. 计数质数、Leetcode858. 镜面反射、Leetcode952. 按公因数计算最大组件大小

Leetcode204. 计数质数 题目 给定整数 n &#xff0c;返回 所有小于非负整数 n 的质数的数量 。 代码 class Solution:def countPrimes(self, n: int) -> int:if n < 2:return 0prime_arr [1 for _ in range(n)]prime_arr[0], prime_arr[1] 0, 0ls list()for i in…

Python编程 从入门到实践(项目二:数据可视化)

本篇为实践项目二&#xff1a;数据可视化。 配合文章python编程入门学习&#xff0c;代码附文末。 项目二&#xff1a;数据可视化 1.生成数据1.1 安装Matplotlib1.2 绘制简单的折线图1.2.1 修改标签文字和线条粗细1.2.2 校正图形1.2.3 使用内置样式1.2.4 使用scatter()绘制散点…

点云格式-PCD格式介绍

PCD格式介绍 一、概述二、PCD 版本三、文件格式头信息四、数据存储格式类型五、优于其他文件格式的优点六、例子 一、概述 PCD文件格式是PCL库最常用的一种数据格式、也是其提供的一个独有的数据格式&#xff0c;PCD文件格式并不是要重新发明轮子&#xff0c;而是为了补充现有的…

59.螺旋矩阵II(力扣LeetCode)

59.螺旋矩阵II 题目描述 给你一个正整数 n &#xff0c;生成一个包含 1 到 n2 所有元素&#xff0c;且元素按顺时针顺序螺旋排列的 n x n 正方形矩阵 matrix 。 示例 1&#xff1a; 输入&#xff1a;n 3 输出&#xff1a;[[1,2,3],[8,9,4],[7,6,5]] 示例 2&#xff1a; 输…

【测试】测试用例场景设计

专注 文章目录 用例设计公式登录测试用例测试用例设计思路购物车测试用例水杯设计测试用例发红包测试用例1. 正常情况下的测试用例&#xff1a;2. 边界情况下的测试用例&#xff1a;3. 异常情况下的测试用例&#xff1a;4. 特殊情况下的测试用例&#xff1a; 微信朋友圈1. 正常…

Django模型(一)

一、介绍 模型,就是python中的类对应数据库中的表 1.1、ORM ORM 就是通过实例对象的语法,完成关系型数据库的操作的技术,是"对象-关系映射"(Object/Relational Mapping) 的缩写 ORM 把数据库映射成对象 1.2、示例 1.2.1、模型 from django.db import models…

字节跳动员工:5年攒了8400股,价值940W,财富自由的味道

字节跳动员工&#xff1a;5年攒了8400股&#xff0c;价值940W&#xff0c;财富自由的味道 最近&#xff0c;一位字节跳动员工在网络上爆料了他的财富增长故事&#xff0c;引起了广泛关注。这位员工在贴文中自豪地宣布&#xff0c;他加入字节跳动的五年间从未卖出手中的股票&am…

Ultraleap 3Di新建项目之给所有的Joint挂载物体

工程文件 Ultraleap 3Di给所有的Joint挂载物体 前期准备 参考上一期文章&#xff0c;进行正确配置 Ultraleap 3Di配置以及在 Unity 中使用 Ultraleap 3Di手部跟踪 新建项目 初始项目如下&#xff1a; 新建Create Empty 将新建的Create Empty&#xff0c;重命名为LeapPro…

查询redis路径,清除redis缓存

查询redis路径 1、执行ps -ef | grep redis 命令&#xff0c;结果如下&#xff08;记住PID&#xff09; 2、执行ps -u 系统用户名&#xff0c;进一步确定进程id, 我这里的系统用户名是root&#xff0c;执行ps -u root&#xff0c;结果如下&#xff1a; 结合1的操作结果图可知…

qtcreator使用qwt库

先配置好.pro文件&#xff0c;再去ui界面拖拽控件 ui界面会更改配置&#xff0c;故顺序错一个&#xff0c;就凉了&#xff0c;重来吧 准备&#xff1a;库&#xff0c;库头文件 库文件&#xff1a;路径如下 头文件&#xff1a;路径如下 鼠标->右键 &#xff08;有些不用勾…

『OpenCV-Python|鼠标作画笔』

Opencv-Python教程链接&#xff1a;https://opencv-python-tutorials.readthedocs.io/ 本文主要介绍OpenCV-Python如何将鼠标作画笔绘制圆或者矩形。 示例一&#xff1a;图片上双击的位置绘制一个圆圈 首先创建一个鼠标事件回调函数&#xff0c;鼠标事件发生时就会被执行。鼠标…

uni-app 微信小程序之红包雨活动

文章目录 1. 页面效果2. 页面样式代码 1. 页面效果 GIF录屏有点卡&#xff0c;实际比较丝滑 每0.5s掉落一个红包控制4s后自动移除红包点击红包消除红包&#xff08;或者自行1&#xff0c;或者弹窗需求&#xff09; 2. 页面样式代码 <!-- 红包雨活动 --> <template>…

C++中map和set的使用

&#xff08;图片来源于网络&#xff09; &#x1f388;个人主页:&#x1f388; :✨✨✨初阶牛✨✨✨ &#x1f43b;强烈推荐优质专栏: &#x1f354;&#x1f35f;&#x1f32f;C的世界(持续更新中) &#x1f43b;推荐专栏1: &#x1f354;&#x1f35f;&#x1f32f;C语言初阶…

力扣算法-Day20

541. 反转字符串II 给定一个字符串 s 和一个整数 k&#xff0c;从字符串开头算起&#xff0c;每计数至 2k 个字符&#xff0c;就反转这 2k 字符中的前 k 个字符。 如果剩余字符少于 k 个&#xff0c;则将剩余字符全部反转。如果剩余字符小于 2k 但大于或等于 k 个&#xff0c…

【yaml 文件使用】pytest+request 框架中 yaml 配置文件使用

又来进步一点点~~ 背景&#xff1a;最近在学习pytestrequest框架写接口测试自动化&#xff0c;使用yaml文件配置更方便管理用例中的数据&#xff0c;这样更方便 yaml 介绍&#xff1a; 什么是 yaml 文件&#xff1a;YAML 是 “YAML Ain’t a Markup Language”&#xff08;Y…

一、对人工智能大模型了解与认知

黑8说 月黑风高&#xff0c;乌云密布&#xff0c;树木低垂&#xff0c;黯淡沉闷。这黎明前的风暴&#xff0c;预示着新时代的变革即将到来。 在一个8线小城市的办公室中 黑8对主任说&#xff1a; 世界上有男人、女人、人妖&#xff0c;米国有1/3男&#xff0c;2/3女…&#xff…