环境:SpringBoot2.7.16 + Redis6.2.1
1. Redis消息发布订阅
Spring Data 为 Redis 提供了专用的消息传递集成,其功能和命名与 Spring Framework 中的 JMS 集成类似。Redis 消息传递大致可分为两个功能区域:
-
信息发布
-
信息订阅
这是一个通常被称为发布/订阅(Publish/Subscribe,简称 Pub/Sub)模式的示例。RedisTemplate 类用于消息生成。
Redis消息机制最大的2个缺点:
-
没有 Ack 机制,也不保证数据的连续性:生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。如果没有一个消费者,那么消息会被直接丢弃。如果开始有三个消费者,其中一个突然挂掉了,过了一会儿等它再重连时,那么重连期间的消息对于这个消费者来说就彻底丢失了。
-
不持久化消息:如果 Redis 停机重启,发布订阅的消息是不会持久化的,毕竟 Redis 宕机就相当于一个消费者都没有,所有的消息都会被直接丢弃。
示例:
消息监听器
@Component
public class MsgMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
System.out.printf("从通道【%s】, 接收到消息: %s%n", new String(message.getChannel()), new String(message.getBody())) ;
}
}
配置消息监听容器
@Configuration
public class MessageReceiverConfig {
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
MessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 这里监听了2个通道msg和chat
container.addMessageListener(listener, Arrays.asList(ChannelTopic.of("msg"), ChannelTopic.of("chat"))) ;
return container;
}
}
消息发送
@Component
public class MessageSender {
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void sendMessage(String channel, String message) {
stringRedisTemplate.convertAndSend(channel, message);
}
}
2. Redis事务
Redis通过multi、exec和discard命令提供对事务的支持。这些操作可以在RedisTemplate上使用。但是,RedisTemplate不能保证使用相同的连接运行事务中的所有操作。
编程式事务
Spring Data Redis 提供了 SessionCallback 接口,供需要使用同一连接执行多个操作(如使用 Redis 事务)时使用:
@Service
public class RedisTransactionService {
private final StringRedisTemplate stringRedisTemplate ;
public RedisTransactionService(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate ;
}
public void multiOperator() {
List<Object> txResults = stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
public List<Object> execute(RedisOperations operations) throws DataAccessException {
operations.multi();
operations.opsForHash().put("users:666", "name", "张三") ;
operations.opsForValue().set("id", "666") ;
return operations.exec() ;
}
});
for (Object ret : txResults) {
System.out.println(ret) ;
}
}
}
正常输出如下
true
true
使用声明式事务
启用事务支持后,RedisConnection 会绑定到由 ThreadLocal 支持的当前事务。如果事务无误完成,Redis 事务将通过 EXEC 提交,否则将通过 DISCARD 回滚。Redis 事务是面向批处理的。正在进行的事务中发出的命令会排队,只有在提交事务时才会应用。Spring Data Redis 会区分正在进行的事务中的只读命令和写命令。只读命令(如 KEYS)通过管道连接到新的(非线程绑定)RedisConnection,以允许读取。写入命令由 RedisTemplate 队列处理,并在提交时应用。
@Bean
StringRedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate(redisConnectionFactory) ;
// 手动开启事务功能
template.setEnableTransactionSupport(true);
return template;
}
测试
@Transactional
public void multiOperator2() {
stringRedisTemplate.opsForValue().set("oo", "xx") ;
Set<String> keys = stringRedisTemplate.keys("*") ;
// 输出[]。读操作必须在空闲(不感知事务)连接上运行
System.out.println(keys) ;
String value = stringRedisTemplate.opsForValue().get("oo") ;
// 返回null,因为在事务中设置的值不可见
System.out.println(value) ;
}
如果事务生效,那么上面的输出如下:
[]
null
3. 管道
Redis 提供管道支持,即向服务器发送多条命令而不等待回复,然后一次性读取回复。当你需要连续发送多条命令时,例如向同一个 List 添加多个元素时,管道操作可以提高性能。
Spring Data Redis 提供了多种 RedisTemplate 方法,用于在管道中运行命令。如果不关心管道操作的结果,可以使用标准的 execute 方法,并为管道参数传递 true。executePipelined 方法在管道中运行所提供的 RedisCallback 或 SessionCallback,并返回结果,如下例所示:
@Service
public class RedisPipeliningService {
private final StringRedisTemplate stringRedisTemplate ;
public RedisPipeliningService(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate ;
}
public List<Object> pipe() {
List<Object> results = stringRedisTemplate.executePipelined(
new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
for(int i = 0; i < 10; i++) {
stringRedisConn.rPush("i" + i, String.valueOf(i)) ;
}
return null;
}
});
return results ;
}
}
输出结果
4. Redis LUA脚本
Redis 2.6 及更高版本支持通过 eval 和 evalsha 命令运行 Lua 脚本。Spring Data Redis 为运行脚本提供了高级抽象,可处理序列化并自动使用 Redis 脚本缓存。示例如下:
LUA脚本
这是一个分布式锁的简单实现
加锁
-- 判断锁key是否存在
if (redis.call('EXISTS', KEYS[1]) == 0) then
-- 不存在
redis.call('HINCRBY', KEYS[1], ARGV[1], 1) ;
redis.call('PEXPIRE', KEYS[1], 300000) ;
return 1;
end ;
-- 判断hash中是否存在ARGV[1]字段(该字段用来统计重入次数,一般使用threadId唯一标识)
if (redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1) then
redis.call('HINCRBY', KEYS[1], ARGV[1], 1) ;
redis.call('PEXPIRE', KEYS[1], 300000) ;
return 1;
end ;
return 0 ;
解锁
-- 判断锁key是否存在
if (redis.call('EXISTS', KEYS[1]) == 0) then
return nil ;
end ;
local counter = redis.call('HINCRBY', KEYS[1], ARGV[1], -1) ;
if (counter > 0) then
redis.call('PEXPIRE', KEYS[1], 30000) ;
return 1 ;
else
redis.call('DEL', KEYS[1]) ;
return 1 ;
end ;
return 0 ;
锁实现
public interface PLock {
boolean lock() ;
void unlock() ;
}
public class ProductRedisLock implements PLock {
private StringRedisTemplate stringRedisTemplate ;
private String key ;
private String id ;
public ProductRedisLock(String key, StringRedisTemplate stringRedisTemplate) {
this.key = key ;
this.id = key.hashCode() + ":";
this.stringRedisTemplate = stringRedisTemplate ;
}
@Override
public boolean lock() {
long threadId = Thread.currentThread().getId() ;
return this.stringRedisTemplate.execute(RedisScript.of(new ClassPathResource("lock.lua"), Boolean.class),
Arrays.asList(this.key), this.id + threadId) ;
}
@Override
public void unlock() {
long threadId = Thread.currentThread().getId() ;
this.stringRedisTemplate.execute(RedisScript.of(new ClassPathResource("unlock.lua"), Boolean.class),
Arrays.asList(this.key), this.id + threadId) ;
}
}
使用
public void updateProduct(Long id) {
String key = "lock:product:" + id ;
PLock lock = new ProductRedisLock(key, stringRedisTemplate) ;
if (lock.lock()) {
try {
System.out.println("更新商品:" + id) ;
this.configProduct(id) ;
} finally {
lock.unlock() ;
}
}
}