SpringBoot实现Redis Stream队列
前言
简单实现一下在SpringBoot中操作Redis Stream队列的方式,监听队列中的消息进行消费。
jdk:1.8
springboot-version:2.6.3
redis:5.0.1(5版本以上才有Stream队列)
准备工作
1pom
redis 依赖包(version 2.6.3)
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
2 yml
spring: redis:database: 0host: 127.0.0.1
3 RedisStreamUtil
工具类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;@Component
public class RedisStreamUtil {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 创建消费组** @param key 键名称* @param group 组名称* @return {@link String}*/public String oup(String key, String group) {return redisTemplate.opsForStream().createGroup(key, group);}/*** 获取消费者信息** @param key 键名称* @param group 组名称* @return {@link StreamInfo.XInfoConsumers}*/public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {return redisTemplate.opsForStream().consumers(key, group);}/*** 查询组信息** @param key 键名称* @return*/public StreamInfo.XInfoGroups queryGroups(String key) {return redisTemplate.opsForStream().groups(key);}// 添加Map消息public String addMap(String key, Map<String, Object> value) {return redisTemplate.opsForStream().add(key, value).getValue();}// 读取消息public List<MapRecord<String, Object, Object>> read(String key) {return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));}// 确认消费public Long ack(String key, String group, String... recordIds) {return redisTemplate.opsForStream().acknowledge(key, group, recordIds);}// 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁public Long del(String key, String... recordIds) {return redisTemplate.opsForStream().delete(key, recordIds);}// 判断是否存在keypublic boolean hasKey(String key) {Boolean aBoolean = redisTemplate.hasKey(key);return aBoolean != null && aBoolean;}
}
代码实现
生产者发送消息
生产者发送消息,在Service层创建addMessage
方法,往队列中发送消息。
代码中addMap()
方法第一个参数为key,第二个参数为value,该key要和后续配置的保持一致,暂时先记住这个key。
@Service
@Slf4j
@RequiredArgsConstructor
public class RedisStreamMqServiceImpl implements RedisStreamMqService {private final RedisStreamUtil redisStreamUtil;/*** 发送一个消息** @return {@code Object}*/@Overridepublic Object addMessage() {RedisUser redisUser = new RedisUser();redisUser.setAge(18);redisUser.setName("hcr");redisUser.setEmail("156ef561@gmail.com");Map<String, Object> message = new HashMap<>();message.put("user", redisUser);String recordId = redisStreamUtil.addMap("mystream", message);return recordId;}
}
controller接口方法
@RestController
@RequestMapping("/redis")
@Slf4j
@RequiredArgsConstructor
public class RedisController {private final RedisStreamMqService redisStreamMqService;@GetMapping("/addMessage")public Object addMessage() {return redisStreamMqService.addMessage();}
}
调用测试,查看redis中是否正常添加数据。
接口返回数据
1702622585248-0
查看redis中的数据
消费者监听消息进行消费
创建RedisConsumersListener
监听器
import cn.hcr.utils.RedisStreamUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@Slf4j
@RequiredArgsConstructor
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {public final RedisStreamUtil redisStreamUtil;/*** 监听器** @param message*/@Overridepublic void onMessage(MapRecord<String, String, String> message) {// stream的key值String streamKey = message.getStream();//消息IDRecordId recordId = message.getId();//消息内容Map<String, String> msg = message.getValue();log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg);//处理逻辑//逻辑处理完成后,ack消息,删除消息,group为消费组名称StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey);xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));redisStreamUtil.del(streamKey, recordId.getValue());}
}
创建RedisConfig
配置类,配置监听
package cn.hcr.config;import cn.hcr.listener.RedisConsumersListener;
import cn.hcr.utils.RedisStreamUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import javax.annotation.Resource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;@Configuration
@Slf4j
public class RedisConfig {@Resourceprivate RedisStreamUtil redisStreamUtil;/*** redis序列化** @param redisConnectionFactory* @return {@code RedisTemplate<String, Object>}*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();template.setKeySerializer(stringRedisSerializer);template.setHashKeySerializer(stringRedisSerializer);template.setValueSerializer(jackson2JsonRedisSerializer);template.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}@Beanpublic Subscription subscription(RedisConnectionFactory factory) {AtomicInteger index = new AtomicInteger(1);int processors = Runtime.getRuntime().availableProcessors();ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,new LinkedBlockingDeque<>(), r -> {Thread thread = new Thread(r);thread.setName("async-stream-consumer-" + index.getAndIncrement());thread.setDaemon(true);return thread;});StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 一次最多获取多少条消息.batchSize(5).executor(executor).pollTimeout(Duration.ofSeconds(1)).errorHandler(throwable -> {log.error("[MQ handler exception]", throwable);throwable.printStackTrace();}).build();//该key和group可根据需求自定义配置String streamName = "mystream";String groupname = "mygroup";initStream(streamName, groupname);var listenerContainer = StreamMessageListenerContainer.create(factory, options);// 手动ask消息Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil));// 自动ask消息/* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/listenerContainer.start();return subscription;}private void initStream(String key, String group) {boolean hasKey = redisStreamUtil.hasKey(key);if (!hasKey) {Map<String, Object> map = new HashMap<>(1);map.put("field", "value");//创建主题String result = redisStreamUtil.addMap(key, map);//创建消费组redisStreamUtil.oup(key, group);//将初始化的值删除掉redisStreamUtil.del(key, result);log.info("stream:{}-group:{} initialize success", key, group);}}
}
redisTemplate:该bean用于配置redis序列化
subscription:配置监听
initStream:初始化消费组
监听测试
使用addMessage()
方法投送一条消息后,查看控制台输出信息。
【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=["cn.hcr.pojo.RedisUser",{"name":"hcr","age":18,"email":"156ef561@gmail.com"}]
}
总结
以上就是在SpringBoot中简单实现Redis Stream队列的Demo,如有需要源码或者哪里不清楚的请评论或者发送私信。
Template:该bean用于配置redis序列化
subscription:配置监听
initStream:初始化消费组
监听测试
使用addMessage()
方法投送一条消息后,查看控制台输出信息。
【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=["cn.hcr.pojo.RedisUser",{"name":"hcr","age":18,"email":"156ef561@gmail.com"}]
}
总结
以上就是在SpringBoot中简单实现Redis Stream队列的Demo,如有需要源码或者哪里不清楚的请评论或者发送私信。