生产环境中如何使用Caffeine+Redis实现二级缓存(详细分析了各种情况)
本篇主要讲解的是实现Caffeine+Redis实现一个现成的使用流程。下一篇讲解什么是Caffeine以及caffeine的使用
00背景:
使用Caffeine和Redis的二级缓存方案源自于分布式系统中对高性能、高可用性、低延迟、数据一致性的需求。二级缓存结合了本地缓存的快速访问能力和分布式缓存的数据共享与持久化特性,解决了单极缓存的局限性,可用于高并发及分布式场景。
1.设计目标
高性能:利用 Caffeine 的低延迟和 Redis 的高吞吐量。
一致性:确保 L1 和 L2 缓存与数据源的数据一致。
高可用性:处理缓存失效、Redis 宕机等异常情况(下面会分析解决方案)。
扩展性:支持多实例部署和水平扩展。
可监控:提供命中率、延迟等指标,便于调优。
2.架构设计
架构描述
L1 缓存(Caffeine)
- 部署在每个应用程序实例的 JVM 内存中,存储热点数据。
- 特点:纳秒级访问延迟,适合高频访问的少量数据。
- 限制:数据仅限当前实例,无法跨实例共享。
L2 缓存(Redis)
- 部署为独立的分布式缓存服务(单机、主从或集群模式)。
- 特点:支持跨实例共享、持久化、复杂数据结构。
- 限制:微秒到毫秒级延迟,受网络影响。
数据源
- 数据库 MySQL)
- 当 L1 和 L2 缓存均未命中时,从数据源加载数据。
工作流程
-
读取数据:
客户端请求数据,应用程序首先查询 L1 缓存(Caffeine)。
若 L1 未命中(cache miss),查询 L2 缓存(Redis)。
若 L2 也未命中,从数据源(如数据库)加载数据。
将数据写入 L2(Redis),并回填到 L1(Caffeine)。
-
更新数据
数据更新时,先更新数据库
然后通过Redis的发布/订阅机制通知所有应用实例,每个实例删除或更新本地L1缓存。由于发布/订阅不会持久化消息,可以使用消息队列替换Redis中的发布/订阅 -
缓存同步
使用Redis的Pub/Sub或其他消息队列广播失效消息
确保L1和L2缓存与数据源保持一致
3.实现代码
首先加入依赖
<dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId><version>2.9.3</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
代码如下:
package com.example;import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.time.Duration;
@Service
public class TwoLevelCacheService1 {// Caffeine 缓存(L1)private final LoadingCache<String, User> caffeineCache;// Redis 连接(L2)private final StatefulRedisConnection<String, String> redisConnection;// Redis Pub/Sub 连接(用于缓存失效通知)private final StatefulRedisPubSubConnection<String, String> redisPubSubConnection;// 数据源(模拟数据库)private final UserRepository userRepository;// Redis 缓存前缀private static final String CACHE_PREFIX = "user:";// Redis Pub/Sub 频道private static final String INVALIDATE_CHANNEL = "user:invalidate";@Autowiredpublic TwoLevelCacheService1(StatefulRedisConnection<String, String> redisConnection,StatefulRedisPubSubConnection<String, String> redisPubSubConnection,UserRepository userRepository) {this.redisConnection = redisConnection;this.redisPubSubConnection = redisPubSubConnection;this.userRepository = userRepository;// 配置 Caffeine 缓存this.caffeineCache = Caffeine.newBuilder().maximumSize(1000) // 最大缓存 1000 个用户.expireAfterWrite(Duration.ofMinutes(10)) // 写入后 10 分钟过期.recordStats() // 开启统计.build(this::loadFromRedisOrDb); // 加载逻辑}// 初始化 Pub/Sub 监听@PostConstructpublic void initPubSub() {//添加一个监听器,处理接收的消息redisPubSubConnection.addListener(new RedisPubSubAdapter<String,String>() {@Overridepublic void message(String channel, String message) {if (INVALIDATE_CHANNEL.equals(channel)) {caffeineCache.invalidate(message); // 失效 L1 缓存}}});//获取Redis Pub/Sub连接的异步命令接口,并订阅指定的频道RedisPubSubAsyncCommands<String, String> async = redisPubSubConnection.async();async.subscribe(INVALIDATE_CHANNEL);}// 获取用户(先查 L1,再查 L2,最后查数据库)public User getUser(String userId) {return caffeineCache.get(userId);}// 更新用户并失效缓存public void updateUser(User user) {// 更新数据库userRepository.save(user);// 失效 L2 缓存RedisCommands<String, String> commands = redisConnection.sync();commands.del(CACHE_PREFIX + user.getId());// 广播失效消息,通知所有实例失效 L1 缓存commands.publish(INVALIDATE_CHANNEL, user.getId());}// 从 Redis 或数据库加载数据private User loadFromRedisOrDb(String userId) {// 查 Redis (L2)RedisCommands<String, String> commands = redisConnection.sync();String cachedUser = commands.get(CACHE_PREFIX + userId);if (cachedUser != null) {return deserializeUser(cachedUser); // 反序列化}// Redis 未命中,查数据库,查询到的数据存到Redis,并且隐式的存入到caffeineCache中User user = userRepository.findById(userId);if (user != null) {// 回填 Redis,设置 1 小时过期commands.setex(CACHE_PREFIX + userId, 3600, serializeUser(user));}return user;}// 序列化用户对象(示例使用 JSON)private String serializeUser(User user) {return "{\"id\":\"" + user.getId() + "\",\"name\":\"" + user.getName() + "\"}";}// 反序列化用户对象private User deserializeUser(String data) {// 简单解析 JSON,生产环境建议使用 Jackson 或 GsonString[] parts = data.replaceAll("[{}\"]", "").split(",");String id = parts[0].split(":")[1];String name = parts[1].split(":")[1];return new User(id, name);}// 获取缓存统计信息public CacheStats getCacheStats() {return caffeineCache.stats();}
}
4.配置代码
spring:redis:host: localhostport: 6379lettuce:pool:max-active: 100max-idle: 10min-idle: 5timeout: 2000
5.异常处理
-
Redis宕机的话直接回退到数据库查询
在loadFromRedisOrDb方法中捕获Redis异常:try {String cachedUser = commands.get(CACHE_PREFIX + userId);if (cachedUser != null) {return deserializeUser(cachedUser);} } catch (Exception e) {// 记录日志,降级到数据库逻辑log.error("Redis error, fallback to DB", e); }
-
Caffeine加载失败
若加载逻辑抛出异常,返回默认值或抛出自定义异常caffeineCache = Caffeine.newBuilder().build(key -> {try {return loadFromRedisOrDb(key);} catch (Exception e) {throw new CacheException("Failed to load key: " + key, e);}});
6.监控与调优
-
caffeine监控
通过caffeineCache.stats()获取命中率、驱逐率、加载时间
集成Prometheus或者Micrometer,暴露指标:
7.补充
- 不知道大家会有这样的疑问没,因为Caffeine未命中后会查Redis,Redis命中后会将数据写入Caffeine中,Redis没有命中就会查数据库,然后将数据分别写入Redis和Caffeine中,那么Caffeine和Redis中的数据不就是高度重合了吗?
答:其实并不会高度重合,因为在Caffeine中会设置容量,比如我们这里设置的1000条,并且Caffeine达到1000后会通过LRU(最近最少使用)驱逐策略去删除旧数据。因为根据LRU驱逐策略留下的数据都是高访问量的数据。 - 对于代码中的 buid函数
this.caffeineCache = Caffeine.newBuilder().maximumSize(1000) // 最大缓存 1000 个用户.expireAfterWrite(Duration.ofMinutes(10)) // 写入后 10 分钟过期.recordStats() // 开启统计.build(this::loadFromRedisOrDb); // 加载逻辑
这里的build需要接收一个CacheLoader类型的参数,CacheLoader是接口(函数式接口)
@NonNullpublic <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(@NonNull CacheLoader<? super K1, V1> loader) {...........}
CacheLoader使用了@FunctionalInterface注解,说明是函数时接口,其中只有一个load抽象方法
@FunctionalInterface
@SuppressWarnings({"PMD.SignatureDeclareThrowsException", "FunctionalInterfaceMethodChanged"})
public interface CacheLoader<K, V> extends AsyncCacheLoader<K, V> {@NullableV load(@NonNull K key) throws Exception;
}
那么buid的代码就可以进行优化,Java编译器根据caffeineCache的类型(LoadingCache<String,User>)推断出key=String,value=User,发现与loadFromRedisOrDb方法一致,因此可以使用this::loadFromRedisOrDb作为参数
.build(new CacheLoader<String, User>() {@Overridepublic @Nullable User load(@NonNull String key) throws Exception {return loadFromRedisOrDb(key);}}); // 加载逻辑
//----->>>
.build((userId)->loadFromRedisOrDb(userId)); // 加载逻辑
//------>>>
.build(this::loadFromRedisOrDb); // 加载逻辑
Java 的函数式接口机制允许将方法引用直接赋值给接口类型,只要签名匹配。
this::loadFromRedisOrDb 的签名 User (String) 满足 CacheLoader<String, User> 的要求,编译器自动适配。
8.使用RedisTemplate实现
主要代码:
package com.example;import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.time.Duration;@Service
public class TwoLevelCacheService_RedisTemplate {// Caffeine 缓存(L1)private final LoadingCache<String, User> caffeineCache;//RedisTemplateprivate final RedisTemplate<String, String> redisTemplate;// 数据源(模拟数据库)private final UserRepository userRepository;// Redis 缓存前缀private static final String CACHE_PREFIX = "user:";// Redis Pub/Sub 频道private static final String INVALIDATE_CHANNEL = "user:invalidate";@Autowiredpublic TwoLevelCacheService_RedisTemplate(RedisTemplate<String, String> redisTemplate,UserRepository userRepository) {this.redisTemplate = redisTemplate;this.userRepository = userRepository;// 配置 Caffeine 缓存this.caffeineCache = Caffeine.newBuilder().maximumSize(1000) // 最大缓存 1000 个用户.expireAfterWrite(Duration.ofMinutes(10)) // 写入后 10 分钟过期.recordStats() // 开启统计.build(this::loadFromRedisOrDb); // 加载逻辑}// 获取用户(先查 L1,再查 L2,最后查数据库)public User getUser(String userId) {return caffeineCache.get(userId);}// 更新用户并失效缓存public void updateUser(User user) {// 更新数据库userRepository.save(user);// 失效 L2 缓存redisTemplate.delete(CACHE_PREFIX + user.getId());// 广播失效消息,通知所有实例失效 L1 缓存redisTemplate.convertAndSend(INVALIDATE_CHANNEL, user.getId());}// 从 Redis 或数据库加载数据private User loadFromRedisOrDb(String userId) {// 查 Redis (L2)String cachedUser = redisTemplate.opsForValue().get(CACHE_PREFIX + userId);if (cachedUser != null) {return deserializeUser(cachedUser); // 反序列化}// Redis 未命中,查数据库,查询到的数据存到Redis,并且隐式的存入到caffeineCache中User user = userRepository.findById(userId);if (user != null) {// 回填 Redis,设置 1 小时过期redisTemplate.opsForValue().set(CACHE_PREFIX+user.getId(),serializeUser(user),Duration.ofHours(1));}return user;}// 序列化用户对象(示例使用 JSON)private String serializeUser(User user) {return "{\"id\":\"" + user.getId() + "\",\"name\":\"" + user.getName() + "\"}";}// 反序列化用户对象private User deserializeUser(String data) {// 简单解析 JSON,生产环境建议使用 Jackson 或 GsonString[] parts = data.replaceAll("[{}\"]", "").split(",");String id = parts[0].split(":")[1];String name = parts[1].split(":")[1];return new User(id, name);}// 获取缓存统计信息public CacheStats getCacheStats() {return caffeineCache.stats();}
}
配置类 (redisTempalte和订阅channel)
@Configuration
public class RedisConfig {/*** 配置Redis模板*/@Beanpublic RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, String> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 设置序列化器,确保键值是字符串template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;}/*** 配置监听容器*/@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,LoadingCache<String, User> caffeineCache) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(new CacheInvalidationListener(caffeineCache),new ChannelTopic("user:invalidate"));return container;}
}
配置消息监听处理逻辑
public class CacheInvalidationListener implements MessageListener {private final LoadingCache<String, User> caffeineCache;private static final String INVALIDATE_CHANNEL = "user:invalidate";public CacheInvalidationListener(LoadingCache<String, User> caffeineCache) {this.caffeineCache = caffeineCache;}@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = message.getChannel().toString();if (INVALIDATE_CHANNEL.equals(channel)){String userId = message.getBody().toString();caffeineCache.invalidate(userId);}}
}
需要补充的地方请大家在下面留言一起讨论