布隆过滤器
https://pkg.go.dev/github.com/bits-and-blooms/bloom/v3
- 作用:
- 判断一个元素是不是在集合中
- 工作原理:
- 一个位数组(bit array),初始全为0。
- 多个哈希函数,运算输入,从而映射到位数组的不同位索引上,对应值改为1。
- 布隆过滤器是在redis外层的,对redis的请求先走布隆,布隆判断查询的数据是缓存命中的,那么走redis,否则拦截。通过这样来处理缓存穿透问题。
一些值得注意的点
- 同一输入用hash运算得来的位数组上的多个对应位置是可能相同的,即不同输入,可能得到同一输出。所以布隆过滤器有误判的风险,不过用来处理缓存穿透是合适的。
- 假如输入是“hello”,经过hash后对应0、1索引上的值变为1,现在又输入“你好”,hash后是1、2索引上的值变为1,如果我要删除hello,就会导致你好也被破坏。所以(基础布隆过滤器)无法删除元素。
- 输入“hello”和“你好”经过hash后的对应位可能相同,这就是误判的情况,如果实际缓存中只有“hello”那么查询“你好”也会被引导到redis。
- 假如现在要查“hello”但是0、1上的预期值不为1,那么“hello”一定不在缓存。
- 总结:布隆过滤器可以判断“可能存在”和“一定不存在”
实现细节梳理:
- 可以弄一个布隆预热函数,运行时先从redis读取所有缓存id运算好对应二进制数组的位置,这样就相当于把当前所有的缓存的”特征值“都存到布隆过滤器了,(也可以开个定期触发的协程,不断调用)
package mainimport ("context""fmt""log""sync""time""github.com/bits-and-blooms/bloom/v3""github.com/go-redis/redis/v8"
)var (bloomFilter *bloom.BloomFiltercache sync.MapredisClient *redis.Client // Redis客户端filterLock sync.Mutexctx = context.Background()
)func init() {// 初始化Redis连接redisClient = redis.NewClient(&redis.Options{Addr: "localhost:6379", // Redis地址Password: "", // 密码DB: 0, // 数据库})// 初始化布隆过滤器bloomFilter = bloom.NewWithEstimates(1_000_000, 0.001)// 从Redis预热布隆过滤器if err := preheatBloomFilter(); err != nil {log.Fatalf("Failed to preheat bloom filter: %v", err)}
}// preheatBloomFilter 从Redis加载存在的key
func preheatBloomFilter() error {start := time.Now()log.Println("Starting bloom filter preheating...")// 1. 使用SCAN迭代所有product key(生产环境建议使用特定前缀)var cursor uint64var keys []stringfor {var err error// 假设product key的格式为 product:1001keys, cursor, err = redisClient.Scan(ctx, cursor, "product:*", 1000).Result()if err != nil {return fmt.Errorf("Redis SCAN failed: %w", err)}// 将找到的key添加到布隆过滤器for _, key := range keys {// 提取纯ID(假设key格式为product:{id})id := key[8:] // 跳过"product:"前缀bloomFilter.AddString(id)}if cursor == 0 { // 迭代结束break}}// 2. 或者如果使用Set存储所有ID(更推荐的方式)// 假设所有产品ID存储在product:ids集合中ids, err := redisClient.SMembers(ctx, "product:ids").Result()if err != nil {return fmt.Errorf("Failed to get product IDs: %w", err)}for _, id := range ids {bloomFilter.AddString(id)}log.Printf("Bloom filter preheated. Total keys: %d, Duration: %v", len(ids)+len(keys), time.Since(start))return nil
}// 定期重建布隆过滤器(可选)
func startBloomFilterRebuildJob() {ticker := time.NewTicker(1 * time.Hour)go func() {for range ticker.C {filterLock.Lock()if err := preheatBloomFilter(); err != nil {log.Printf("Failed to rebuild bloom filter: %v", err)}filterLock.Unlock()}}()
}// getProduct 获取商品信息(带Redis缓存)
func getProduct(ctx context.Context, productID string) (string, error) {// 1. 布隆过滤器检查if !bloomFilter.TestString(productID) {return "", fmt.Errorf("商品不存在")}// 2. 检查Redis缓存cacheKey := "product:" + productIDval, err := redisClient.Get(ctx, cacheKey).Result()if err == nil {return val, nil}// 3. 查询数据库(这里演示直接返回)// 实际应该查询真实数据库,这里返回模拟数据productData := "商品详情数据"// 4. 将新数据写入Redisif err := redisClient.Set(ctx, cacheKey, productData, randomExpiration(30*time.Minute, 5*time.Minute)).Err(); err != nil {log.Printf("Failed to set Redis cache: %v", err)}// 5. 更新布隆过滤器(如果确认是新key)filterLock.Lock()bloomFilter.AddString(productID)filterLock.Unlock()return productData, nil
}// 生成随机过期时间(防雪崩)
func randomExpiration(base, randomRange time.Duration) time.Duration {return base + time.Duration(rand.Int63n(int64(randomRange)))
}
代码
bloom.go
package cacheimport ("context""errors""github.com/bits-and-blooms/bloom/v3""github.com/redis/go-redis/v9"pkgredis "shorturl/pkg/db/redis"
)// BloomFilter 布隆过滤器接口
type BloomFilter interface {// Add 添加元素到布隆过滤器Add(key string, value string) error// Exists 检查元素是否可能存在于布隆过滤器中Exists(key string, value string) (bool, error)
}// RedisBloomFilter 基于Redis的布隆过滤器实现
type RedisBloomFilter struct {redisClient *redis.Clientdestroy func()// 布隆过滤器参数filter *bloom.BloomFilterkey string // 布隆过滤器在Redis中的键名
}// NewRedisBloomFilter 创建一个新的Redis布隆过滤器
func NewRedisBloomFilter(client *redis.Client, key string, expectedItems int, errorRate float64, destroy func()) BloomFilter {// 使用bits-and-blooms库创建布隆过滤器filter := bloom.NewWithEstimates(uint(expectedItems), errorRate)return &RedisBloomFilter{redisClient: client,destroy: destroy,filter: filter,key: key,}
}// Add 添加元素到布隆过滤器
func (bf *RedisBloomFilter) Add(key string, value string) error {// 添加到内存中的布隆过滤器bf.filter.AddString(value)// 将布隆过滤器的位数组序列化并存储到Redisbits, err := bf.filter.MarshalBinary()if err != nil {return err}// 存储到Redisreturn bf.redisClient.Set(context.Background(), bf.key, bits, 0).Err()
}// Exists 检查元素是否可能存在于布隆过滤器中
func (bf *RedisBloomFilter) Exists(key string, value string) (bool, error) {// 从Redis获取布隆过滤器的位数组bits, err := bf.redisClient.Get(context.Background(), bf.key).Bytes()if err != nil {if errors.Is(err, redis.Nil) {// 如果布隆过滤器不存在,则元素一定不存在return false, nil}return false, err}// 反序列化布隆过滤器if err := bf.filter.UnmarshalBinary(bits); err != nil {return false, err}// 检查元素是否可能存在return bf.filter.TestString(value), nil
}// BloomFilterFactory 布隆过滤器工厂接口
type BloomFilterFactory interface {// NewBloomFilter 创建一个新的布隆过滤器实例NewBloomFilter(key string, expectedItems int, errorRate float64) BloomFilter
}// RedisBloomFilterFactory 基于Redis的布隆过滤器工厂
type RedisBloomFilterFactory struct {redisPool pkgredis.RedisPool
}// NewRedisBloomFilterFactory 创建一个新的Redis布隆过滤器工厂
func NewRedisBloomFilterFactory(redisPool pkgredis.RedisPool) BloomFilterFactory {return &RedisBloomFilterFactory{redisPool: redisPool,}
}// NewBloomFilter 创建一个新的布隆过滤器实例
func (f *RedisBloomFilterFactory) NewBloomFilter(key string, expectedItems int, errorRate float64) BloomFilter {client := f.redisPool.Get()return NewRedisBloomFilter(client, key, expectedItems, errorRate, func() {f.redisPool.Put(client)})
}
bloom.go梳理和功能总结:
1. 核心功能
该文件实现了一个基于 Redis 的布隆过滤器(Bloom Filter),并提供了工厂模式来创建布隆过滤器实例。
2. 主要接口与结构
(1) BloomFilter 接口
定义了布隆过滤器的基本操作:
Add(key string, value string) error
:将元素添加到布隆过滤器。Exists(key string, value string) (bool, error)
:检查元素是否可能存在于布隆过滤器中。
(2) RedisBloomFilter 结构
实现了 BloomFilter
接口,基于 Redis 存储布隆过滤器的位数组:
- 字段:
redisClient *redis.Client
:Redis 客户端。destroy func()
:释放 Redis 连接的回调函数。filter *bloom.BloomFilter
:内存中的布隆过滤器实例。key string
:布隆过滤器在 Redis 中的键名。
- 方法:
Add
:将元素添加到内存中的布隆过滤器,并将位数组序列化后存储到 Redis。Exists
:从 Redis 获取布隆过滤器的位数组,反序列化后检查元素是否存在。
(3) BloomFilterFactory 接口
定义了布隆过滤器工厂的基本操作:
NewBloomFilter(key string, expectedItems int, errorRate float64) BloomFilter
:创建一个新的布隆过滤器实例。
(4) RedisBloomFilterFactory 结构
实现了 BloomFilterFactory
接口,用于创建基于 Redis 的布隆过滤器实例:
- 字段:
redisPool pkgredis.RedisPool
:Redis 连接池。
- 方法:
NewBloomFilter
:从连接池获取 Redis 客户端,创建一个新的布隆过滤器实例,并在销毁时释放 Redis 连接。
3. 关键逻辑
(1) 布隆过滤器的初始化
-
使用
github.com/bits-and-blooms/bloom/v3
库创建布隆过滤器实例:filter := bloom.NewWithEstimates(uint(expectedItems), errorRate)
-
参数说明:
expectedItems
:预计插入的元素数量。errorRate
:允许的误报率。
(2) 元素的添加
-
将元素添加到内存中的布隆过滤器:
bf.filter.AddString(value)
-
将布隆过滤器的位数组序列化后存储到 Redis:
bits, err := bf.filter.MarshalBinary() if err != nil {return err } return bf.redisClient.Set(context.Background(), bf.key, bits, 0).Err()
(3) 元素的存在性检查
-
从 Redis 获取布隆过滤器的位数组:
bits, err := bf.redisClient.Get(context.Background(), bf.key).Bytes()
-
如果 Redis 中不存在该键,则返回
false
表示元素一定不存在。 -
反序列化布隆过滤器并检查元素是否存在:
if err := bf.filter.UnmarshalBinary(bits); err != nil {return false, err } return bf.filter.TestString(value), nil
(4) 工厂模式
- 工厂模式用于管理 Redis 连接池,确保每个布隆过滤器实例使用独立的 Redis 连接,并在销毁时释放连接:
client := f.redisPool.Get() return NewRedisBloomFilter(client, key, expectedItems, errorRate, func() {f.redisPool.Put(client) })
4. 依赖库
github.com/bits-and-blooms/bloom/v3
:布隆过滤器的核心实现。github.com/redis/go-redis/v9
:Redis 客户端。shorturl/pkg/db/redis
:自定义的 Redis 连接池封装。
https://github.com/0voice