生产环境中如何使用Caffeine+Redis实现二级缓存(详细分析了遇到的各种情况)

生产环境中如何使用Caffeine+Redis实现二级缓存(详细分析了各种情况)

本篇主要讲解的是实现Caffeine+Redis实现一个现成的使用流程。下一篇讲解什么是Caffeine以及caffeine的使用

00背景:

使用Caffeine和Redis的二级缓存方案源自于分布式系统中对高性能高可用性低延迟数据一致性的需求。二级缓存结合了本地缓存的快速访问能力分布式缓存的数据共享与持久化特性,解决了单极缓存的局限性,可用于高并发及分布式场景。

1.设计目标

高性能:利用 Caffeine 的低延迟和 Redis 的高吞吐量。
一致性:确保 L1 和 L2 缓存与数据源的数据一致。
高可用性:处理缓存失效、Redis 宕机等异常情况(下面会分析解决方案)。
扩展性:支持多实例部署和水平扩展。
可监控:提供命中率、延迟等指标,便于调优。

2.架构设计

架构描述

L1 缓存(Caffeine)

  • 部署在每个应用程序实例的 JVM 内存中,存储热点数据。
  • 特点:纳秒级访问延迟,适合高频访问的少量数据。
  • 限制:数据仅限当前实例,无法跨实例共享。

L2 缓存(Redis)

  • 部署为独立的分布式缓存服务(单机、主从或集群模式)。
  • 特点:支持跨实例共享、持久化、复杂数据结构。
  • 限制:微秒到毫秒级延迟,受网络影响。

数据源

  • 数据库 MySQL)
  • 当 L1 和 L2 缓存均未命中时,从数据源加载数据。

工作流程

  1. 读取数据:

    客户端请求数据,应用程序首先查询 L1 缓存(Caffeine)。

    若 L1 未命中(cache miss),查询 L2 缓存(Redis)。

    若 L2 也未命中,从数据源(如数据库)加载数据。

    将数据写入 L2(Redis),并回填到 L1(Caffeine)。

  2. 更新数据

    数据更新时,先更新数据库
    然后通过Redis的发布/订阅机制通知所有应用实例,每个实例删除或更新本地L1缓存。由于发布/订阅不会持久化消息,可以使用消息队列替换Redis中的发布/订阅

  3. 缓存同步

    使用Redis的Pub/Sub或其他消息队列广播失效消息
    确保L1和L2缓存与数据源保持一致

3.实现代码

首先加入依赖

<dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId><version>2.9.3</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

代码如下:

package com.example;import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.time.Duration;
@Service
public class TwoLevelCacheService1 {// Caffeine 缓存(L1)private final LoadingCache<String, User> caffeineCache;// Redis 连接(L2)private final StatefulRedisConnection<String, String> redisConnection;// Redis Pub/Sub 连接(用于缓存失效通知)private final StatefulRedisPubSubConnection<String, String> redisPubSubConnection;// 数据源(模拟数据库)private final UserRepository userRepository;// Redis 缓存前缀private static final String CACHE_PREFIX = "user:";// Redis Pub/Sub 频道private static final String INVALIDATE_CHANNEL = "user:invalidate";@Autowiredpublic TwoLevelCacheService1(StatefulRedisConnection<String, String> redisConnection,StatefulRedisPubSubConnection<String, String> redisPubSubConnection,UserRepository userRepository) {this.redisConnection = redisConnection;this.redisPubSubConnection = redisPubSubConnection;this.userRepository = userRepository;// 配置 Caffeine 缓存this.caffeineCache = Caffeine.newBuilder().maximumSize(1000) // 最大缓存 1000 个用户.expireAfterWrite(Duration.ofMinutes(10)) // 写入后 10 分钟过期.recordStats() // 开启统计.build(this::loadFromRedisOrDb); // 加载逻辑}// 初始化 Pub/Sub 监听@PostConstructpublic void initPubSub() {//添加一个监听器,处理接收的消息redisPubSubConnection.addListener(new RedisPubSubAdapter<String,String>() {@Overridepublic void message(String channel, String message) {if (INVALIDATE_CHANNEL.equals(channel)) {caffeineCache.invalidate(message); // 失效 L1 缓存}}});//获取Redis Pub/Sub连接的异步命令接口,并订阅指定的频道RedisPubSubAsyncCommands<String, String> async = redisPubSubConnection.async();async.subscribe(INVALIDATE_CHANNEL);}// 获取用户(先查 L1,再查 L2,最后查数据库)public User getUser(String userId) {return caffeineCache.get(userId);}// 更新用户并失效缓存public void updateUser(User user) {// 更新数据库userRepository.save(user);// 失效 L2 缓存RedisCommands<String, String> commands = redisConnection.sync();commands.del(CACHE_PREFIX + user.getId());// 广播失效消息,通知所有实例失效 L1 缓存commands.publish(INVALIDATE_CHANNEL, user.getId());}// 从 Redis 或数据库加载数据private User loadFromRedisOrDb(String userId) {// 查 Redis (L2)RedisCommands<String, String> commands = redisConnection.sync();String cachedUser = commands.get(CACHE_PREFIX + userId);if (cachedUser != null) {return deserializeUser(cachedUser); // 反序列化}// Redis 未命中,查数据库,查询到的数据存到Redis,并且隐式的存入到caffeineCache中User user = userRepository.findById(userId);if (user != null) {// 回填 Redis,设置 1 小时过期commands.setex(CACHE_PREFIX + userId, 3600, serializeUser(user));}return user;}// 序列化用户对象(示例使用 JSON)private String serializeUser(User user) {return "{\"id\":\"" + user.getId() + "\",\"name\":\"" + user.getName() + "\"}";}// 反序列化用户对象private User deserializeUser(String data) {// 简单解析 JSON,生产环境建议使用 Jackson 或 GsonString[] parts = data.replaceAll("[{}\"]", "").split(",");String id = parts[0].split(":")[1];String name = parts[1].split(":")[1];return new User(id, name);}// 获取缓存统计信息public CacheStats getCacheStats() {return caffeineCache.stats();}
}

4.配置代码

spring:redis:host: localhostport: 6379lettuce:pool:max-active: 100max-idle: 10min-idle: 5timeout: 2000

5.异常处理

  1. Redis宕机的话直接回退到数据库查询
    在loadFromRedisOrDb方法中捕获Redis异常:

    try {String cachedUser = commands.get(CACHE_PREFIX + userId);if (cachedUser != null) {return deserializeUser(cachedUser);}
    } catch (Exception e) {// 记录日志,降级到数据库逻辑log.error("Redis error, fallback to DB", e);
    }
    
  2. Caffeine加载失败
    若加载逻辑抛出异常,返回默认值或抛出自定义异常

    caffeineCache = Caffeine.newBuilder().build(key -> {try {return loadFromRedisOrDb(key);} catch (Exception e) {throw new CacheException("Failed to load key: " + key, e);}});
    

6.监控与调优

  1. caffeine监控

    通过caffeineCache.stats()获取命中率、驱逐率、加载时间

    集成Prometheus或者Micrometer,暴露指标:

7.补充

  1. 不知道大家会有这样的疑问没,因为Caffeine未命中后会查Redis,Redis命中后会将数据写入Caffeine中,Redis没有命中就会查数据库,然后将数据分别写入Redis和Caffeine中,那么Caffeine和Redis中的数据不就是高度重合了吗?
    答:其实并不会高度重合,因为在Caffeine中会设置容量,比如我们这里设置的1000条,并且Caffeine达到1000后会通过LRU(最近最少使用)驱逐策略去删除旧数据。因为根据LRU驱逐策略留下的数据都是高访问量的数据。
  2. 对于代码中的 buid函数
this.caffeineCache = Caffeine.newBuilder().maximumSize(1000) // 最大缓存 1000 个用户.expireAfterWrite(Duration.ofMinutes(10)) // 写入后 10 分钟过期.recordStats() // 开启统计.build(this::loadFromRedisOrDb); // 加载逻辑

这里的build需要接收一个CacheLoader类型的参数,CacheLoader是接口(函数式接口)

  @NonNullpublic <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(@NonNull CacheLoader<? super K1, V1> loader) {...........}

CacheLoader使用了@FunctionalInterface注解,说明是函数时接口,其中只有一个load抽象方法

@FunctionalInterface
@SuppressWarnings({"PMD.SignatureDeclareThrowsException", "FunctionalInterfaceMethodChanged"})
public interface CacheLoader<K, V> extends AsyncCacheLoader<K, V> {@NullableV load(@NonNull K key) throws Exception;
}

那么buid的代码就可以进行优化,Java编译器根据caffeineCache的类型(LoadingCache<String,User>)推断出key=String,value=User,发现与loadFromRedisOrDb方法一致,因此可以使用this::loadFromRedisOrDb作为参数

.build(new CacheLoader<String, User>() {@Overridepublic @Nullable User load(@NonNull String key) throws Exception {return loadFromRedisOrDb(key);}}); // 加载逻辑
//----->>>
.build((userId)->loadFromRedisOrDb(userId)); // 加载逻辑
//------>>>
.build(this::loadFromRedisOrDb); // 加载逻辑

Java 的函数式接口机制允许将方法引用直接赋值给接口类型,只要签名匹配。

this::loadFromRedisOrDb 的签名 User (String) 满足 CacheLoader<String, User> 的要求,编译器自动适配。

8.使用RedisTemplate实现

主要代码:

package com.example;import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.time.Duration;@Service
public class TwoLevelCacheService_RedisTemplate {// Caffeine 缓存(L1)private final LoadingCache<String, User> caffeineCache;//RedisTemplateprivate final RedisTemplate<String, String> redisTemplate;// 数据源(模拟数据库)private final UserRepository userRepository;// Redis 缓存前缀private static final String CACHE_PREFIX = "user:";// Redis Pub/Sub 频道private static final String INVALIDATE_CHANNEL = "user:invalidate";@Autowiredpublic TwoLevelCacheService_RedisTemplate(RedisTemplate<String, String> redisTemplate,UserRepository userRepository) {this.redisTemplate = redisTemplate;this.userRepository = userRepository;// 配置 Caffeine 缓存this.caffeineCache = Caffeine.newBuilder().maximumSize(1000) // 最大缓存 1000 个用户.expireAfterWrite(Duration.ofMinutes(10)) // 写入后 10 分钟过期.recordStats() // 开启统计.build(this::loadFromRedisOrDb); // 加载逻辑}// 获取用户(先查 L1,再查 L2,最后查数据库)public User getUser(String userId) {return caffeineCache.get(userId);}// 更新用户并失效缓存public void updateUser(User user) {// 更新数据库userRepository.save(user);// 失效 L2 缓存redisTemplate.delete(CACHE_PREFIX + user.getId());// 广播失效消息,通知所有实例失效 L1 缓存redisTemplate.convertAndSend(INVALIDATE_CHANNEL, user.getId());}// 从 Redis 或数据库加载数据private User loadFromRedisOrDb(String userId) {// 查 Redis (L2)String cachedUser = redisTemplate.opsForValue().get(CACHE_PREFIX + userId);if (cachedUser != null) {return deserializeUser(cachedUser); // 反序列化}// Redis 未命中,查数据库,查询到的数据存到Redis,并且隐式的存入到caffeineCache中User user = userRepository.findById(userId);if (user != null) {// 回填 Redis,设置 1 小时过期redisTemplate.opsForValue().set(CACHE_PREFIX+user.getId(),serializeUser(user),Duration.ofHours(1));}return user;}// 序列化用户对象(示例使用 JSON)private String serializeUser(User user) {return "{\"id\":\"" + user.getId() + "\",\"name\":\"" + user.getName() + "\"}";}// 反序列化用户对象private User deserializeUser(String data) {// 简单解析 JSON,生产环境建议使用 Jackson 或 GsonString[] parts = data.replaceAll("[{}\"]", "").split(",");String id = parts[0].split(":")[1];String name = parts[1].split(":")[1];return new User(id, name);}// 获取缓存统计信息public CacheStats getCacheStats() {return caffeineCache.stats();}
}

配置类 (redisTempalte和订阅channel)

@Configuration
public class RedisConfig {/*** 配置Redis模板*/@Beanpublic RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, String> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 设置序列化器,确保键值是字符串template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;}/*** 配置监听容器*/@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,LoadingCache<String, User> caffeineCache) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(new CacheInvalidationListener(caffeineCache),new ChannelTopic("user:invalidate"));return container;}
}

配置消息监听处理逻辑

public class CacheInvalidationListener implements MessageListener {private final LoadingCache<String, User> caffeineCache;private static final String INVALIDATE_CHANNEL = "user:invalidate";public CacheInvalidationListener(LoadingCache<String, User> caffeineCache) {this.caffeineCache = caffeineCache;}@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = message.getChannel().toString();if (INVALIDATE_CHANNEL.equals(channel)){String userId = message.getBody().toString();caffeineCache.invalidate(userId);}}
}

需要补充的地方请大家在下面留言一起讨论

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/79524.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RT-Thread开发文档合集

瑞萨VisionBoard开发实践指南 RT-Thread 文档中心 RT-Thread-【RA8D1-Vision Board】 RA8D1 Vision Board上的USB实践RT-Thread问答社区 - RT-Thread 【开发板】环境篇&#xff1a;05烧录工具介绍_哔哩哔哩_bilibili 【RA8D1-Vision Board】基于OpenMV 实现图像分类_哔哩哔哩_…

甘果桌面tv版下载-甘果桌面安卓电视版使用教程

甘果桌面 TV 版是一款备受关注的应用&#xff0c;它可以让安卓电视的界面更加个性化、操作更加便捷。接下来&#xff0c;我们就详细了解一下甘果桌面 TV 版的下载方法以及安卓电视版的使用教程。 甘果桌面 TV 版下载 打开你的安卓电视&#xff0c;找到并进入电视自带的应用商店…

RAII资源管理理解

基础介绍 RAII (Resource Acquisition Is Initialization) 是一种 C 编程范式&#xff0c;这不是一个语法特性&#xff0c;而是一种处理方式。RAII的思想&#xff1a; 资源获取与对象初始化同时发生资源释放与对象销毁同时发生通过对象的生命周期来管理资源&#xff0c;确保资…

解锁元生代:ComfyUI工作流与云原生后端的深度融合

目录 蓝耘元生代&#xff1a;智算新势力崛起​ ComfyUI 工作流创建详解​ ComfyUI 初印象​ 蓝耘平台上搭建 ComfyUI 工作流​ 构建基础工作流实操​ 代码示例与原理剖析​ 云原生后端技术全景 云原生后端概念解析​ 核心技术深度解读​ 蓝耘元生代中两者的紧密联系​…

实战篇|多总线网关搭建与量产验证(5000 字深度指南)

引言 1. 环境准备与硬件选型 1.1 项目需求分析 1.2 SoC 与开发板选型 1.3 物理接口与 PCB 设计 1.4 电源与供电保护 2. 软件架构与协议栈移植 2.1 分层架构详解 2.2 协议栈移植步骤 2.3 高可用驱动设计 2.4 映射逻辑与 API 定义 3. 开发流程与实践 3.1 敏捷迭代与里程碑 3.2 核…

Kafka安全认证技术:SASL/SCRAM-ACL方案详解

#作者 &#xff1a;张桐瑞 文章目录 1Kafka安全认证技术介绍2基础设置3 配置SASL/SCRAM认证3.1编写server.properties配置3.2编写kafka.conf密码文件3.3编写user.properties配置文件3.4编写kafka-run-class.sh脚本文件3.5Zk中增加kafka用户3.6启动kafka进程 1Kafka安全认证技术…

TCP/IP和UDP协议的发展历程

TCP/IP和UDP协议的发展历程 引言 互联网的发展史是人类技术创新的辉煌篇章&#xff0c;而在这一发展过程中&#xff0c;通信协议发挥了奠基性的作用。TCP/IP&#xff08;传输控制协议/互联网协议&#xff09;和UDP&#xff08;用户数据报协议&#xff09;作为互联网通信的基础…

PhotoShop学习10

1.画板功能的使用 使用画板功能可以轻松针对不同的设备和屏幕尺寸设计网页和 APP。画板是一种容器&#xff0c;类似于特殊图层组。画板中的图层在图层面板中&#xff0c;按画板进行分组。 使用画板&#xff0c;一个文档中可以有多个设计版面&#xff0c;这样可以在画板之间轻…

X-AnyLabeling开源程序借助 Segment Anything 和其他出色模型的 AI 支持轻松进行数据标记。

一、软件介绍 文末提供源码和程序下载学习 使用 X-AnyLabeling开源程序可以 导入、管理和保存数据。用户可以通过多种方式导入图像和视频文件&#xff0c;包括快捷方式或菜单选项。此外&#xff0c;它还涵盖数据删除、图像切换以及标签和图像数据的保存&#xff0c;以确保高效…

【深度解析】PlatformIO多环境配置实践:ESP32/ESP32-S3/ESP32-C3适配指南

一、前言&#xff1a;为什么需要多环境配置&#xff1f; 在物联网开发中&#xff0c;我们经常需要适配不同型号的硬件平台&#xff08;如ESP32系列&#xff09;,并且github上多数关于ESP32的都适配了多种开发板。传统开发方式需要为每个平台维护独立项目&#xff0c;而Platfor…

React 列表渲染基础示例

React 中最常见的一个需求就是「把一组数据渲染成一组 DOM 元素」&#xff0c;比如一个列表。下面是我写的一个最小示例&#xff0c;目的是搞清楚它到底是怎么工作的。 示例代码 // 定义一个静态数组&#xff0c;模拟后续要渲染的数据源 // 每个对象代表一个前端框架&#xf…

NHANES指标推荐:CMI

文章题目&#xff1a;Association between cardiometabolic index and biological ageing among adults: a population-based study DOI&#xff1a;10.1186/s12889-025-22053-3 中文标题&#xff1a;成年人心脏代谢指数与生物衰老之间的关系&#xff1a;一项基于人群的研究 发…

QT调用ffmpeg库实现视频录制

可以通过QProcess调用ffmpeg命令行,也可以直接调用ffmpeg库,方便。 调用库 安装ffmpeg ffmpeg -version 没装就装 sudo apt-get update sudo apt-get install ffmpeg sudo apt-get install ffmpeg libavdevice-dev .pro引入库路径,引入库 LIBS += -L/usr/lib/aarch64-l…

消息中间件——RocketMQ(二)

前言&#xff1a;此篇文章系本人学习过程中记录下来的笔记&#xff0c;里面难免会有不少欠缺的地方&#xff0c;诚心期待大家多多给予指教。 RocketMQ&#xff08;一&#xff09; 接上期内容&#xff1a;上期完成了RocketMQ单机部署知识。下面学习RocketMQ集群相关知识&#xf…

pyqt环境配置

文章目录 1 概述2 PyQt6和PySide6区别3 环境配置4 配置PySide65 配置PyQt66 配置外部工具7 添加模板8 使用pyside6-project构建工程9 常见错误10 相关地址 更多精彩内容&#x1f449;内容导航 &#x1f448;&#x1f449;Qt开发 &#x1f448;&#x1f449;python开发 &#x1…

金融数据库转型实战读后感

荣幸收到老友太保科技有限公司数智研究院首席专家林春的签名赠书。 这是国内第一本关于OceanBase数据库实际替换过程总结的的实战书。打个比方可以说是从战场上下来分享战斗经验。读后感受颇深。我在这里讲讲我的感受。 第三章中提到的应用改造如何降本。应用改造是国产化替换…

旅游资源网站登录(jsp+ssm+mysql5.x)

旅游资源网站登录(jspssmmysql5.x) 旅游资源网站是一个为旅游爱好者提供全面服务的平台。网站登录界面简洁明了&#xff0c;用户可以选择以管理员或普通用户身份登录。成功登录后&#xff0c;用户可以访问个人中心&#xff0c;进行修改密码和个人信息管理。用户管理模块允许管…

STM32 HAL库之WDG示例代码

独立看门狗&#xff08;IWDG&#xff09; 在规定时间内按按键喂狗并将LED关闭&#xff0c;若产生看门狗复位则LED打开 初始化独立看门狗&#xff0c;在main.c中的 MX_IWDG_Init();&#xff0c;也就是iwdg.c中的初始化代码 void MX_IWDG_Init(void) {/* USER CODE BEGIN IWDG…

【第47节】windows程序的其他反调试手段下篇

目录 一、利用Hardware Breakpoints Detection 二、PatchingDetection - CodeChecksumCalculation 补丁检测&#xff0c;代码检验和 三、block input 封锁键盘、鼠标输入 四、使用EnableWindow 禁用窗口 五、利用ThreadHideFromDebugger 六、使用Disabling Breakpoints 禁…

【笔记ing】AI大模型-03深度学习基础理论

神经网络&#xff1a;A neural network is a network or circuit of neurons,or in a modern sense,an artificial neural network,composed of artificial neurons or nodes.神经网络是神经元的网络或回路&#xff0c;或者在现在意义上来说&#xff0c;是一个由人工神经元或节…