要实现一个 SQLiteCache 也是很简单的只需要创建一个 cacheManager
Bean 即可
// 如果配置文件中 spring.cache.sqlite.enable = false 则不启用
@Bean("cacheManager")
@ConditionalOnProperty(name = ["spring.cache.sqlite.enable"], havingValue = "true", matchIfMissing = false)
fun cacheManager(sqliteMemoryConnection: Connection): CacheManager {// TODO 返回 CacheManager
}
同样的还需要 SQLite 这里 SQLite 的 url 设置为 jdbc:sqlite::memory:
@Bean
fun sqliteMemoryConnection(): Connection {val dataSource = SQLiteDataSource()dataSource.url = urllogger.info("SQLite cache 创建连接: $url")return dataSource.connection
}
配置代码
该代码仅仅作为整活使用
package io.github.zimoyin.ra3.configimport io.github.zimoyin.ra3.config.SQLiteCacheConfig.EvictionPolicy.*
import kotlinx.coroutines.*
import org.bouncycastle.asn1.x500.style.RFC4519Style.name
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.cache.Cache
import org.springframework.cache.CacheManager
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.validation.annotation.Validated
import org.sqlite.SQLiteDataSource
import java.sql.Connection
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantReadWriteLock
import javax.sql.DataSource
import kotlin.math.log
import kotlin.time.Duration.Companion.minutes@Configuration
@Validated
@ConfigurationProperties(prefix = "spring.cache.sqlite")
class SQLiteCacheConfig(var url: String = "jdbc:sqlite::memory:",var enable: Boolean = false,var tableCacheSize: Int = 100,var expirationMilliseconds: Long = 60000L,var evictionPolicy: EvictionPolicy = FIFO,
) {private val logger = LoggerFactory.getLogger(SQLiteCacheConfig::class.java)@Beanfun sqliteDataSource(): DataSource {val dataSource = SQLiteDataSource()dataSource.url = urllogger.info("初始化 SQLiteCache 数据库地址: $url")return dataSource}@Bean("sqliteCacheManager")@ConditionalOnProperty(name = ["spring.cache.sqlite.enable"], havingValue = "true")fun cacheManager(sqliteDataSource: DataSource): CacheManager = SQLiteCacheManager(dataSource = sqliteDataSource,maxSize = tableCacheSize,expirationMs = expirationMilliseconds,evictionPolicy = evictionPolicy)enum class EvictionPolicy {/*** First In First Out (FIFO)*/FIFO,/*** Least Recently Used (LRU)*/LRU,/*** Least Frequently Used (LFU)*/LFU}class SQLiteCacheManager(private val dataSource: DataSource,private val maxSize: Int,private val expirationMs: Long,private val evictionPolicy: EvictionPolicy,) : CacheManager {private val cacheMap = ConcurrentHashMap<String, SQLiteCache>()private val logger = LoggerFactory.getLogger(SQLiteCacheManager::class.java)private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)init {// 启动定时清理任务(首次延迟1分钟,之后每分钟执行)CoroutineScope(SupervisorJob() + Dispatchers.IO).launch {while (isActive) {delay(1.minutes)cleanupExpiredEntries()}}logger.info("初始化 SQLiteCacheManager: dataSource=$dataSource, maxSize=$maxSize, expirationMs=$expirationMs, evictionPolicy=$evictionPolicy")}override fun getCache(name: String): Cache {return cacheMap.computeIfAbsent(name) {SQLiteCache(name = it,dataSource = dataSource,maxSize = maxSize,expirationMs = expirationMs,evictionPolicy = evictionPolicy).also { cache ->logger.info("创建缓存表 $name")cache.initialize()}}}override fun getCacheNames(): MutableCollection<String> = cacheMap.keysprivate fun cleanupExpiredEntries() {cacheMap.values.forEach { cache ->try {logger.debug("缓存表 ${cache.name} 命中率: ${cache.getHitRate()}")cache.evictExpiredItems()} catch (e: Exception) {logger.error("Error cleaning expired entries in cache ${cache.name}", e)}}}@Synchronizedfun shutdown() {scope.cancel()cacheMap.values.forEach { it.close() }}}class SQLiteCache(private val name: String,val dataSource: DataSource,val maxSize: Int,val expirationMs: Long,val evictionPolicy: EvictionPolicy,) : Cache {private val logger = LoggerFactory.getLogger(SQLiteCache::class.java)private val connection: Connection = dataSource.connection.apply {autoCommit = false}private val lock = ReentrantReadWriteLock()private val hitCount = AtomicLong()private val missCount = AtomicLong()fun initialize() {createTableIfNotExists()createIndexes()}fun close() {connection.close()}override fun getName(): String = nameoverride fun getNativeCache(): Any = connectionoverride fun get(key: Any): Cache.ValueWrapper? {return try {lock.readLock().lock()getInternal(key.toString()).also {if (it != null) hitCount.incrementAndGet() else missCount.incrementAndGet()}} finally {lock.readLock().unlock()}}override fun <T : Any?> get(key: Any, type: Class<T>?): T? {return get(key)?.get() as? T}override fun <T : Any?> get(key: Any, valueLoader: Callable<T>): T {return try {lock.writeLock().lock()get(key)?.get() as? T ?: run {val value = valueLoader.call()put(key, value)value}} finally {lock.writeLock().unlock()}}override fun put(key: Any, value: Any?) {try {lock.writeLock().lock()executeInTransaction {evictIfNecessary()upsertEntry(key.toString(), value)}} finally {lock.writeLock().unlock()}}override fun evict(key: Any) {executeInTransaction {deleteEntry(key.toString())}}override fun clear() {executeInTransaction {connection.createStatement().executeUpdate("DELETE FROM $name")}}fun getHitRate(): Double {val total = hitCount.get() + missCount.get()return if (total == 0L) 0.0 else hitCount.get().toDouble() / total}internal fun evictExpiredItems() {executeInTransaction {val currentTime = System.currentTimeMillis()connection.prepareStatement("DELETE FROM $name WHERE expires_at < ?").use { ps ->ps.setLong(1, currentTime)ps.executeUpdate()}}}private fun getInternal(key: String): Cache.ValueWrapper? {return connection.prepareStatement("SELECT value, expires_at FROM $name WHERE key = ?").use { ps ->ps.setString(1, key)ps.executeQuery().use { rs ->if (rs.next()) {val expiresAt = rs.getLong("expires_at")if (System.currentTimeMillis() > expiresAt) {deleteEntry(key)null} else {updateAccessMetrics(key)Cache.ValueWrapper { rs.getObject("value") }}} else {null}}}}/*** 更新访问指标*/private fun updateAccessMetrics(key: String) {when (evictionPolicy) {LRU -> updateLastAccessed(key)LFU -> incrementAccessCount(key)FIFO -> run { /*不需要更新*/ }}}private fun upsertEntry(key: String, value: Any?) {val now = System.currentTimeMillis()connection.prepareStatement("""INSERT INTO $name (key, value, expires_at, created_at, last_accessed, access_count)VALUES (?, ?, ?, ?, ?, 1)ON CONFLICT(key) DO UPDATE SETvalue = excluded.value,expires_at = excluded.expires_at,last_accessed = excluded.last_accessed,access_count = access_count + 1""").use { ps ->ps.setString(1, key)ps.setObject(2, value)ps.setLong(3, now + expirationMs)ps.setLong(4, now)ps.setLong(5, now)ps.executeUpdate()}}private fun deleteEntry(key: String) {connection.prepareStatement("DELETE FROM $name WHERE key = ?").use { ps ->ps.setString(1, key)ps.executeUpdate()}}private fun evictIfNecessary() {if (currentSize() >= maxSize) {when (evictionPolicy) {FIFO -> evictByCreatedTime()LRU -> evictByLastAccessed()LFU -> evictByAccessCount()}}}private fun currentSize(): Int {return connection.createStatement().use { stmt ->stmt.executeQuery("SELECT COUNT(*) FROM $name ").use { rs ->if (rs.next()) rs.getInt(1) else 0}}}private fun evictByCreatedTime() {evictOldest("created_at")}private fun evictByLastAccessed() {evictOldest("last_accessed")}private fun evictByAccessCount() {connection.createStatement().use { stmt ->stmt.executeQuery("SELECT key FROM $name ORDER BY access_count ASC, created_at ASC LIMIT 1").use { rs ->if (rs.next()) deleteEntry(rs.getString(1))}}}private fun evictOldest(column: String) {connection.createStatement().use { stmt ->stmt.executeQuery("SELECT key FROM $name ORDER BY $column ASC LIMIT 1").use { rs ->if (rs.next()) deleteEntry(rs.getString(1))}}}private fun updateLastAccessed(key: String) {connection.prepareStatement("UPDATE $name SET last_accessed = ? WHERE key = ?").use { ps ->ps.setLong(1, System.currentTimeMillis())ps.setString(2, key)ps.executeUpdate()}}private fun incrementAccessCount(key: String) {connection.createStatement().executeUpdate("UPDATE $name SET access_count = access_count + 1 WHERE key = '$key'")}private fun createTableIfNotExists() {connection.createStatement().execute("""CREATE TABLE IF NOT EXISTS $name (key TEXT PRIMARY KEY,value BLOB,expires_at INTEGER,created_at INTEGER,last_accessed INTEGER,access_count INTEGER DEFAULT 0)""")}/*** 创建索引*/private fun createIndexes() {arrayOf("CREATE INDEX IF NOT EXISTS idx_${name}_expires ON $name (expires_at)","CREATE INDEX IF NOT EXISTS idx_${name}_created ON $name (created_at)","CREATE INDEX IF NOT EXISTS idx_${name}_last_access ON $name (last_accessed)","CREATE INDEX IF NOT EXISTS idx_${name}_access_count ON $name (access_count)").forEach { indexSql ->connection.createStatement().execute(indexSql)}}/*** 执行在事务中运行的代码块,并返回结果。如果代码块执行成功,则提交事务;如果代码块执行失败,则回滚事务。*/private inline fun <T> executeInTransaction(block: () -> T): T {return try {val result = block()connection.commit()result} catch (ex: Exception) {connection.rollback()throw ex}}}
}
配置文件
spring:cache:sqlite:enable: falseexpiration-milliseconds: 60000table-cache-size: 10000eviction-policy: lru