项目收获总结--Redis的知识收获

一、概述

最近几天公司项目开发上线完成,做个收获总结吧~ 今天记录Redis的收获和提升。
在这里插入图片描述

二、Redis异步队列

Redis做异步队列一般使用 list 结构作为队列,rpush 生产消息,lpop 消费消息。当 lpop 没有消息的时候,要适当sleep再重试。若不用 sleep,list 还有个指令叫 blpop,在没有消息的时候,它会阻塞队列直到消息到来。

若要生产一次消费多次的需求,可以使用 pub/sub 主题订阅者模式,可以实现 1:N 的消息队列。但是缺点在消费者下线的情况下,生产的消息会丢失,还得使用专业的消息队列如 RabbitMQ等。

若要redis实现延时队列,使用 sortedset,拿时间戳作为score,消息内容作为 key 调用 zadd 来生产消息,消费者用 zrangebyscore 指令获取 N 秒之前的数据轮询进行处理。

三、Redis分布式锁

核心思路是:先用setnx来争抢锁,抢到之后,再用 expire 给锁加一个过期时间防止锁忘记释放。若在 setnx 之后执行expire之前进程意外崩溃或者要重启维护,导致锁永远得不到释放,就要采用 set 指令配合非常复杂的参数,可以同时把 setnx 和 expire 合成一条指令来用:

SET key value NX EX 10
3.1 Redis分布式锁的实现

pom.xml引入依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-redis</artifactId><version>1.4.7.RELEASE</version>
</dependency>

配置Redis:

 
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0
spring.redis.password=123456
spring.redis.timeout=10000# 设置jedis连接池
spring.redis.jedis.pool.max-active=50
spring.redis.jedis.pool.min-idle=20

配置RedisConfig属性:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {@Beanpublic RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) throws Exception {RedisTemplate redisTemplate = new RedisTemplate();redisTemplate.setConnectionFactory(redisConnectionFactory);// 3.创建 序列化类GenericToStringSerializer genericToStringSerializer = new GenericToStringSerializer(Object.class);redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(genericToStringSerializer);
//        redisTemplate.setKeySerializer(new StringRedisSerializer());
//        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer());redisTemplate.setDefaultSerializer(new StringRedisSerializer());redisTemplate.afterPropertiesSet();return redisTemplate;}
}

RedisLock工具类:

import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class RedisLock {@Resourceprivate RedisTemplate redisTemplate;private static Map<String, LockInfo> lockInfoMap = new ConcurrentHashMap<>();private static final Long SUCCESS = 1L;@Datapublic static class LockInfo {private String key;private String value;private int expireTime;//更新时间private long renewalTime;//更新间隔private long renewalInterval;public static LockInfo getLockInfo(String key, String value, int expireTime) {LockInfo lockInfo = new LockInfo();lockInfo.setKey(key);lockInfo.setValue(value);lockInfo.setExpireTime(expireTime);lockInfo.setRenewalTime(System.currentTimeMillis());lockInfo.setRenewalInterval(expireTime * 2000 / 3);return lockInfo;}}/*** Lua脚本* // 加锁* if*     redis.call('setNx',KEYS[1],ARGV[1])*   then*     if redis.call('get',KEYS[1])==ARGV[1]*     return redis.call('expire',KEYS[1],ARGV[2])*   else*     return 0*   end* end** // 解锁*   redis.call('get', KEYS[1]) == ARGV[1]* then*   return redis.call('del', KEYS[1])* else*   return 0**   //更新时间*   if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end*//*** 使用lua脚本加锁** @param lockKey    锁* @param value      身份标识(保证锁不会被其他人释放)* @param expireTime 锁的过期时间(单位:秒)* @Desc 注意事项,redisConfig配置里面必须使用 genericToStringSerializer序列化,否则获取不了返回值*/public boolean tryLock(String lockKey, String value, int expireTime) {String luaScript = "if redis.call('setNx',KEYS[1],ARGV[1]) then if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end end";DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();redisScript.setResultType(Boolean.class);redisScript.setScriptText(luaScript);List<String> keys = new ArrayList<>();keys.add(lockKey);//Object result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey),value,expireTime + "");// Object result = redisTemplate.execute(redisScript, new StringRedisSerializer(), new StringRedisSerializer(), Collections.singletonList(lockKey), identity, expireTime);Object result = redisTemplate.execute(redisScript, keys, value, expireTime);log.info("已获取到{}对应的锁!", lockKey);if (expireTime >= 10) {lockInfoMap.put(lockKey + value, LockInfo.getLockInfo(lockKey, value, expireTime));}return (boolean) result;}/*** 使用lua脚本释放锁** @param lockKey* @param value* @return 成功返回true, 失败返回false*/public boolean unlock(String lockKey, String value) {lockInfoMap.remove(lockKey + value);String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();redisScript.setResultType(Boolean.class);redisScript.setScriptText(luaScript);List<String> keys = new ArrayList<>();keys.add(lockKey);Object result = redisTemplate.execute(redisScript, keys, value);log.info("解锁成功:{}", result);return (boolean) result;}/*** 使用lua脚本更新redis锁的过期时间** @param lockKey* @param value* @return 成功返回true, 失败返回false*/public boolean renewal(String lockKey, String value, int expireTime) {String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();redisScript.setResultType(Boolean.class);redisScript.setScriptText(luaScript);List<String> keys = new ArrayList<>();keys.add(lockKey);Object result = redisTemplate.execute(redisScript, keys, value, expireTime);log.info("更新redis锁的过期时间:{}", result);return (boolean) result;}/*** redisTemplate加锁** @param lockKey    锁* @param value      身份标识(保证锁不会被其他人释放)* @param expireTime 锁的过期时间(单位:秒)* @return 成功返回true, 失败返回false*/public boolean lock(String lockKey, String value, long expireTime) {return redisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, TimeUnit.SECONDS);}/*** redisTemplate解锁** @param key* @param value* @return 成功返回true, 失败返回false*/public boolean unlock2(String key, String value) {Object currentValue = redisTemplate.opsForValue().get(key);boolean result = false;if (StringUtils.isNotEmpty(String.valueOf(currentValue)) && currentValue.equals(value)) {result = redisTemplate.opsForValue().getOperations().delete(key);}return result;}/*** 定时去检查redis锁的过期时间* @Param* @Return*/@Scheduled(fixedRate = 5000L)@Async("redisExecutor")public void renewal() {long now = System.currentTimeMillis();for (Map.Entry<String, LockInfo> lockInfoEntry : lockInfoMap.entrySet()) {LockInfo lockInfo = lockInfoEntry.getValue();if (lockInfo.getRenewalTime() + lockInfo.getRenewalInterval() < now) {renewal(lockInfo.getKey(), lockInfo.getValue(), lockInfo.getExpireTime());lockInfo.setRenewalTime(now);log.info("lockInfo {}", JSON.toJSONString(lockInfo));}}}/*** 分布式锁设置单独线程池* @Param* @Return*/@Bean("redisExecutor")public Executor redisExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(1);executor.setMaxPoolSize(1);executor.setQueueCapacity(1);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("redis-renewal-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());return executor;}
}

开启定时任务:在启动类上加 @EnableScheduling 注解

@SpringBootApplication
@MapperScan(value = "com.example.recordlog.mapper")
@EnableAspectJAutoProxy(proxyTargetClass = true)
//开启定时任务
@EnableScheduling
public class RecordLogApplication {public static void main(String[] args) {SpringApplication.run(RecordLogApplication.class, args);}
}

Controller测试接口:

 import com.example.recordlog.tools.RedisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;import java.util.List;
import java.util.UUID;@RestController
@RequestMapping("/api")
public class OperateController {@Autowired
private RedisLock redisLock;@RequestMapping(value = "/locked", method = {RequestMethod.GET})
public void lockedTest() {String key = String.format("data-mining:task_statistic:%d", System.currentTimeMillis());String requestId = UUID.randomUUID().toString();boolean locked = false;try {locked = redisLock.tryLock(key, requestId, 30);if (!locked) {return;}//执行业务逻辑System.out.println("---------------->>>执行业务逻辑");} finally {if (locked) {redisLock.unlock(key, requestId);}}}
}
3.2 锁过期问题

上述代码提到使用lua脚本处理锁更新,就顺带记录当Redis 分布式锁过期但任务未完成时的三种处理方案。

锁过期问题是:在使用Redis分布式锁时锁恰好到过期时间,但业务逻辑还没有处理完毕,这可能导致多个进程同时进入临界区,造成数据不一致或业务逻辑冲突、资源浪费等其他问题。
假设分布式锁使用的简单逻辑如下:

获取锁:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;public class RedisLock {private Jedis jedis;private String lockKey;private int lockExpire;public RedisLock(Jedis jedis, String lockKey, int lockExpire) {this.jedis = jedis;this.lockKey = lockKey;this.lockExpire = lockExpire;}public boolean tryLock(String requestId) {SetParams params = new SetParams();params.nx().px(lockExpire);String result = jedis.set(lockKey, requestId, params);return "OK".equals(result);}public void unlock(String requestId) {String script ="if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else return 0 end";jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));}
}

使用锁:


Jedis jedis = new Jedis("localhost");
RedisLock redisLock = new RedisLock(jedis, "my_lock", 5000);String requestId = UUID.randomUUID().toString();
if (redisLock.tryLock(requestId)) {try {// 业务逻辑Thread.sleep(6000); // 模拟业务逻辑处理时间超过锁过期时间} catch (InterruptedException e) {e.printStackTrace();} finally {redisLock.unlock(requestId);}
} else {System.out.println("获取锁失败");
}

解决方案:
(1)自动续期:当锁接近过期时,自动延长锁的过期时间,确保业务逻辑在锁持有期间不会被其他进程获取。使用 ScheduledExecutorService 定期检查并延长锁的过期时间,确保锁在业务逻辑处理完之前不会过期。提供简单代码思路:

public class RedisLockWithRenewal extends RedisLock {private ScheduledExecutorService scheduler;public RedisLockWithRenewal(Jedis jedis, String lockKey, int lockExpire) {super(jedis, lockKey, lockExpire);scheduler = Executors.newScheduledThreadPool(1);}@Overridepublic boolean tryLock(String requestId) {boolean locked = super.tryLock(requestId);if (locked) {startAutoRenewal(requestId);}return locked;}private void startAutoRenewal(String requestId) {scheduler.scheduleAtFixedRate(() -> {String script ="if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('pexpire', KEYS[1], ARGV[2]) " +"else return 0 end";jedis.eval(script, Collections.singletonList(lockKey), Arrays.asList(requestId, String.valueOf(lockExpire)));}, lockExpire / 3, lockExpire / 3, TimeUnit.MILLISECONDS);}@Overridepublic void unlock(String requestId) {scheduler.shutdown();super.unlock(requestId);}
}

(2)锁重入机制:锁重入机制允许持有锁的线程可以再次获取锁而不被阻塞,这可以通过记录锁持有者的信息来实现。同一线程可以多次获取锁,直到所有业务逻辑执行完毕,最后一次调用 unlock 时才真正释放锁。

import java.util.concurrent.ConcurrentHashMap;public class ReentrantRedisLock extends RedisLock {private ConcurrentHashMap<String, Integer> lockHolderMap = new ConcurrentHashMap<>();public ReentrantRedisLock(Jedis jedis, String lockKey, int lockExpire) {super(jedis, lockKey, lockExpire);}@Overridepublic synchronized boolean tryLock(String requestId) {if (lockHolderMap.containsKey(requestId)) {lockHolderMap.put(requestId, lockHolderMap.get(requestId) + 1);return true;} else {boolean locked = super.tryLock(requestId);if (locked) {lockHolderMap.put(requestId, 1);}return locked;}}@Overridepublic synchronized void unlock(String requestId) {if (lockHolderMap.containsKey(requestId)) {int count = lockHolderMap.get(requestId);if (count > 1) {lockHolderMap.put(requestId, count - 1);} else {lockHolderMap.remove(requestId);super.unlock(requestId);}}}
}

(3)使用 Redisson 库:Redisson是Redis客户端,提供分布式锁的实现,支持自动续期、锁重入等高级特性,简化分布式锁的使用。

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;import java.util.concurrent.TimeUnit;public class RedissonLockExample {public static void main(String[] args) {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");RedissonClient redissonClient = Redisson.create(config);RLock lock = redissonClient.getLock("my_lock");try {if (lock.tryLock(0, 10, TimeUnit.SECONDS)) {try {// 业务逻辑Thread.sleep(6000); // 模拟业务逻辑处理时间} finally {lock.unlock();}} else {System.out.println("获取锁失败");}} catch (InterruptedException e) {e.printStackTrace();} finally {redissonClient.shutdown();}}
}
四、Redis不立刻删除已过期数据的原因

比较有印象的经历,曾经尝试监听redis key过期来实现延迟消息发送,后来发现这个延迟非常不稳定,明明key已经过期,但还是没有触发,后来了解到过期key不会立马删除…这个问题需要对Redis 内存管理有了解,下面通过四个问题来考虑吧!

4.1 Redis 给缓存数据设置过期时间有什么用?

内存是有限且珍贵的,如果不对缓存数据设置过期时间,那内存占用就会一直增长,最终可能会导致 OOM 问题。通过设置合理的过期时间,Redis 会自动删除暂时不需要的数据,为新的缓存数据腾出空间。

Redis 自带给缓存数据设置过期时间的功能,比如:

127.0.0.1:6379> expire key 60 # 数据在 60s 后过期
(integer) 1
127.0.0.1:6379> setex key 60 value # 数据在 60s 后过期 (setex:[set] + [ex]pire)
OK
127.0.0.1:6379> ttl key # 查看数据还有多久过期
(integer) 56

Redis 中除字符串类型是独有设置过期时间的命令 setex 外,其他类型都需要依靠 expire 命令来设置过期时间 。另外, persist 命令可以移除一个键的过期时间。

当然,除缓解内存的消耗,还有其他用途,很多时候,业务场景就是需要某个数据只在某一时间段内存在,比如短信验证码可能只在 1 分钟内有效,用户登录的 Token 可能只在 1 天内有效,秒杀系统等。

若用传统的数据库来处理的话,基本是SQL判断过期,更麻烦且性能很差。

4.2 Redis 如何判断数据是否过期?

Redis 通过过期字典(可看作是 hash 表)来保存数据过期的时间。过期字典的键指向 Redis 数据库中的某个 key(键),过期字典的值是一个 long long 类型的整数,这个整数保存 key 所指向的数据库键的过期时间(毫秒精度的 UNIX 时间戳)。
在这里插入图片描述
过期字典是存储在 redisDb 这个结构里的:

typedef struct redisDb {...dict *dict;     //数据库键空间,保存着数据库中所有键值对dict *expires   // 过期字典,保存着键的过期时间...
} redisDb;

在查询一个 key 的时候,Redis 首先检查该 key 是否存在于过期字典中(时间复杂度为 O(1)),如果不在就直接返回,在的话需要判断一下这个 key 是否过期,过期直接删除 key 然后返回 null。

4.3 Redis 过期 key 删除策略?

如果设置一批 key 只能存活 1 分钟,那1分钟后,Redis 对这批 key 进行删除,就需要遵循删除策略。常用的过期数据的删除策略有四种:

1)惰性删除:只会在取出/查询 key 的时候才对数据进行过期检查。这种方式对CPU最友好,但是可能会造成太多过期key没有被删除。
(2)定期删除:周期性地随机从设置过期时间的 key 中抽查一批,然后逐个检查这些 key 是否过期,过期就删除 key。相比于惰性删除,定期删除对内存更友好,对 CPU 不太友好。
(3)延迟队列:把设置过期时间的 key 放到一个延迟队列里,到期之后就删除 key。这种方式可以保证每个过期 key 都能被删除,但维护延迟队列太麻烦,队列本身也要占用资源。
(4)定时删除:每个设置过期时间的 key 都会在设置的时间到达时立即被删除。这种方法可以确保内存中不会有过期的键,但是它对 CPU 的压力最大,因为它需要为每个键都设置一个定时器。

Redis 采用的是 定期删除+惰性/懒汉式删除 结合的策略,这也是大部分缓存框架的选择。定期删除对内存更加友好,惰性删除对 CPU 更加友好。两者各有优势,结合起来使用既能兼顾 CPU 友好,又能兼顾内存友好。

Redis 的定期删除过程是随机的(周期性地随机从设置过期时间的 key 中抽查一批),也因此并不保证所有过期键都会被立即删除。这也就解释为什么有的 key 已经过期,但并没有被删除。并且,Redis 底层会通过限制删除操作执行的时长和频率来减少删除操作对 CPU 时间的影响。
另外,定期删除还受到执行时间和过期 key 的比例的影响:

  • 执行时间已经超过阈值,那就中断本次定期删除循环,避免使用过多占用CPU 。
  • 如果这批过期 key比例超过一个比例,就会重复执行此删除流程,更积极地清理过期 key。相应地,若过期的 key 比例低于这个比例,就会中断这次定期删除循环,避免做过多的工作而获得很少的内存回收。

查看源码所知:Redis7.2版本的执行时间阈值是25ms,过期key比例设定值是 10%

#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which we do extra efforts. */

每次随机抽查数量在expire.c中定义,Redis 7.2 版本为 20 ,即每次会随机选择 20 个已设置过期时间的 key 判断是否过期。

#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */

那如何控制定期删除的执行频率?

在 Redis 中,定期删除的频率是由 hz 参数控制的。hz 默认为 10,代表每秒执行 10 次,也就是每秒钟进行 10 次尝试来查找并删除过期的 key

hz 的取值范围为 1~500。增大 hz 参数的值会提升定期删除的频率。如果想要更频繁地执行定期删除任务,可以适当增加 hz 的值,但这会加 CPU 的使用率。根据 Redis 官方建议,hz 的值不建议超过100,其实使用默认的 10 就已足够。

下面是 hz 参数的官方注释(Redis 7.2 版本redis.conf中)的重要信息。
在这里插入图片描述
类似的参数还有 “dynamic-hz”(redis.conf中),这个参数开启之后 Redis 就会在 hz 的基础上动态计算一个值。Redis 提供并默认启用使用自适应 hz 值的能力:

# 默认为 10
hz 10
# 默认开启
dynamic-hz yes

另外:hz 参数控制着定期删除过期 key 的定期任务之外,还控制一些其他定期任务例如关闭超时的客户端连接、更新统计信息等。

最后,定期删除不是把所有过期 key 都删除, 若key 数量非常庞大的话,挨个遍历检查是非常耗时的,会严重影响性能。Redis 设计这种策略的目的就是平衡内存和性能。
而 key 过期之后不立马删除是因为成本太高不太好办到,就算使用延迟队列作为删除策略,这样存在问题:

1)队列本身的开销可能很大:key 多的情况下,一个延迟队列可能无法容纳。
(2)维护延迟队列太麻烦:修改 key 的过期时间就需要调整期在延迟队列中的位置,并且,还需要引入并发控制。
4.4 如何防止大量 key 集中过期?

如果存在大量 key 集中过期的问题,可能会使 Redis 的请求延迟变高。可选方案:

1)尽量避免 key 集中过期,在设置键的过期时间时尽量随机一点。
(2)对过期的 key 开启 lazyfree 机制(修改 redis.conf 中的 lazyfree-lazy-expire参数即可),这样会在后台异步删除过期的 key,不会阻塞主线程的运行。
五、Redis bigkey处理

若key 对应的 value 所占用的内存比较大,那这个 key 就可以看作是 bigkey。
在这里插入图片描述

String 类型的 value 超过 1MB
复合类型(List、Hash、Set、Sorted Set 等)的 value 包含的元素超过 5000

bigkey会引发:客户端超时阻塞、网络阻塞、工作线程阻塞

1.使用 Redis 自带的 --bigkeys 参数来查找

# redis-cli -p 6379 --bigkeys# Scanning the entire keyspace to find biggest keys as well as
# average sizes per key type.  You can use -i 0.1 to sleep 0.1 sec
# per 100 SCAN commands (not usually needed).[00.00%] Biggest string found so far '"ballcat:oauth:refresh_auth:f6cdb384-9a9d-4f2f-af01-dc3f28057c20"' with 4437 bytes
[00.00%] Biggest list   found so far '"my-list"' with 17 items-------- summary -------Sampled 5 keys in the keyspace!
Total key length in bytes is 264 (avg len 52.80)Biggest   list found '"my-list"' has 17 items
Biggest string found '"ballcat:oauth:refresh_auth:f6cdb384-9a9d-4f2f-af01-dc3f28057c20"' has 4437 bytes1 lists with 17 items (20.00% of keys, avg size 17.00)
0 hashs with 0 fields (00.00% of keys, avg size 0.00)
4 strings with 4831 bytes (80.00% of keys, avg size 1207.75)
0 streams with 0 entries (00.00% of keys, avg size 0.00)
0 sets with 0 members (00.00% of keys, avg size 0.00)
0 zsets with 0 members (00.00% of keys, avg size 0.00

2.使用 Redis 自带的 SCAN 命令
3.借助开源工具分析 RDB 文件
4.借助公有云的 Redis 分析服务

处理方案:

(1)分割 bigkey:将一个 bigkey 分割为多个小 key。例如,将一个含有上万字段数量的 Hash 按照一定策略(比如二次哈希)拆分为多个 Hash。
(2)手动清理:Redis 4.0+ 可以使用 UNLINK 命令来异步删除一个或多个指定的 key。Redis 4.0 以下可以考虑使用 SCAN 命令结合 DEL 命令来分批次删除。
(3)采用合适的数据结构:例如,文件二进制数据不使用 String 保存、使用 HyperLogLog 统计页面 UV、Bitmap 保存状态信息(0/1)。
(4)开启 lazy-free(惰性删除/延迟释放) :lazy-free 特性是 Redis 4.0 开始引入的,指的是让 Redis 采用异步方式延迟释放 key 使用的内存,将该操作交给单独的子线程处理,避免阻塞主线程。
六、Redis持久化机制

使用缓存的时候,需要对内存中的数据进行持久化,也就是将内存中的数据写入到硬盘中。主要是为之后重用数据(比如重启机器、机器故障之后恢复数据),或者是为做数据同步(比如 Redis 集群的主从节点通过 RDB 文件同步数据)。
Redis 支持持久化方式:

快照(snapshotting,RDB)
只追加文件(append-only file, AOF)
RDB和AOF混合持久化(Redis4.0新增,通过配置项 aof-use-rdb-preamble 开启,AOF 重写的时候就直接把 RDB 的内容写到 AOF 文件开头) 
6.1 RDB持久化

通过创建快照来获得存储在内存里面的数据在 某个时间点 上的副本,然后将快照复制到其他服务器从而创建具有相同数据的服务器副本(Redis 主从结构,主要用来提高 Redis 性能),也可将快照留存本地以便重启服务器时使用。

快照持久化是 Redis 默认采用的持久化方式,在 redis.conf 配置文件中配置:

save 900 1           #在900(15分钟)之后,如果至少有1个key发生变化,Redis就会自动触发bgsave命令创建快照。
save 300 10          #在300(5分钟)之后,如果至少有10个key发生变化,Redis就会自动触发bgsave命令创建快照。
save 60 10000        #在60(1分钟)之后,如果至少有10000个key发生变化,Redis就会自动触发bgsave命令创建快照。

Redis 提供两个命令来生成 RDB 快照文件:

save : 同步保存操作,会阻塞 Redis 主线程(Redis启动后是通过单线程的方式工作);
bgsave : fork 出一个子进程,子进程执行,不会阻塞 Redis 主线程,默认选项。
6.2 AOF持久化

AOF持久化的实时性比RDB好,通过 appendonly 参数开启:

appendonly yes

开启 AOF 持久化后每执行一条会更改 Redis 中的数据的命令,Redis 就会将该命令写入到 AOF 缓冲区 server.aof_buf 中,然后再写入到 AOF 文件中(此时还在系统内核缓存区未同步到磁盘,依然存在数据丢失的风险),最后再根据持久化方式( fsync策略)的配置来决定何时将系统内核缓存区的数据同步到硬盘中完成持久化保存。

AOF 文件的保存位置和 RDB 文件的位置相同,都通过 dir 参数设置,默认的文件名是 appendonly.aof。

6.3 AOF工作流程

AOF 持久化功能的实现可以简单分为 5 步:

1)命令追加(append):所有的写命令会追加到 AOF 缓冲区中。
(2)文件写入(write):将 AOF 缓冲区的数据写入到 AOF 文件中。这一步需要调用write函数(系统调用),write将数据写入系统内核缓冲区之后直接返回(延迟写)。注意此时并没有同步到磁盘。
(3)文件同步(fsync):AOF 缓冲区根据对应的持久化方式( fsync 策略)向硬盘做同步操作。这一步需要调用 fsync 函数(系统调用), fsync 针对单个文件操作,对其进行强制硬盘同步,fsync 将阻塞直到写入磁盘完成后返回,保证了数据持久化。
(4)文件重写(rewrite):随着 AOF 文件越来越大,需要定期对 AOF 文件进行重写,达到压缩的目的。
(5)重启加载(load):当 Redis 重启时,可以加载 AOF 文件进行数据恢复。

名词解释:

1)系统调用(syscall):Linux系统直接提供用于对文件和设备进行访问和控制的函数;
(2)write:写入系统内核缓冲区之后直接返回(仅是写到缓冲区),不会立即同步到硬盘。虽然提高效率,但也有数据丢失的风险。同步硬盘操作通常依赖于系统调度机制,Linux 内核通常为 30s 同步一次,具体值取决于写出的数据量和 I/O 缓冲区的状态。
(3)fsync:fsync用于强制刷新系统内核缓冲区(同步到到磁盘),确保写磁盘操作结束才会返回。

AOF 工作流程表示为:
在这里插入图片描述

6.4 AOF持久化方式

Redis的配置文件中存在三种不同的 AOF 持久化方式( fsync策略),主要区别在于 fsync 同步 AOF 文件的时机(刷盘)。

1)appendfsync always:主线程调用 write 执行写操作后,后台线程(aof_fsync线程)立即调用fsyn 函数同步AOF文件(刷盘),fsync完成后线程返回,这样会严重降低Redis的性能(write+fsync).2)appendfsync everysec:主线程调用write执行写操作后立即返回,由后台线程(aof_fsync线程)每秒钟调用fsync函数(系统调用)同步一次 AOF文件(write+fsync,fsync间隔为1秒)(3)appendfsync no:主线程调用write执行写操作后立即返回,让操作系统决定何时进行同步,Linux下一般为30秒一次(write但不fsync,fsync的时机由操作系统决定)

因此要兼顾数据和写入性能,可以考虑 appendfsync everysec 策略 ,让Redis每秒同步一次 AOF 文件,Redis 性能受到的影响较小。而且这样即使出现系统崩溃,最多只会丢失一秒之内产生的数据。当硬盘忙于执行写入操作时,Redis 还会适当放慢速度以适应硬盘的最大写入速度。

ps:有个了解知识点,详情参考:《Redis 7.0 Multi Part AOF 的设计和实现》

从Redis7.0开始,Redis使用Multi Part AOF机制,将原来的单个AOF文件拆分成多个AOF文件。在Multi Part AOF中,AOF文件被分为三种类型,分别为:
(1)BASE:表示基础AOF文件,一般由子进程通过重写产生,该文件最多只有一个。
(2)INCR:表示增量AOF文件,一般会在AOFRW开始执行时被创建,该文件可能存在多个。
(3)HISTORY:表示历史AOF文件,它由BASE和INCR AOF整合,每次AOFRW成功完成时,本次 AOFRW 前对应的BASE和INCR AOF 将变为HISTORY,HISTORY类型的AOF会被Redis自动删除。

相关 issue:Redis 的 AOF 方式 #783

6.5 AOF日志记录

关系型数据库如 MySQL,通常都是执行命令之前记录日志,方便故障恢复,而 Redis AOF 持久化机制是在执行完命令之后再记录日志。
在这里插入图片描述
后记录日志的原因:

避免额外的检查开销,AOF记录日志不会对命令进行语法检查;
在命令执行完之后再记录,不会阻塞当前的命令执行。

当然,前面提到过风险:

如果刚执行完命令 Redis 就宕机会导致对应的修改丢失;
可能会阻塞后续其他命令的执行(AOF 记录日志是在 Redis 主线程中进行的)。
6.6 AOF重写

当AOF存储量太大时,Redis能够在后台自动重写AOF产生一个新AOF文件,这个新AOF文件和原本AOF文件所保存的数据库状态一样,但体积更小。

Redis 将AOF重写程序放到子进程里执行,避免大量的写入操作对Redis正常处理命令请求造成影响,这个重写操作是redis通过读取数据库中的键值对来实现的,与Java描述的重写不同。
在这里插入图片描述
AOF文件重写期间,Redis还会维护一个AO 重写缓冲区,该缓冲区会在子进程创建新 AOF文件期间,记录服务器执行的所有写命令。当子进程完成创建新AOF文件的工作之后,服务器会将重写缓冲区中的所有内容追加到新AOF文件末尾,使得新AOF件保存的数据库状态与现有的数据库状态一致。最后,服务器用新AOF文件替换旧的AOF文件,来完成AOF文件重写操作。

开启AOF重写功能,可以调用BGREWRITEAOF命令手动执行,也可设置两个配置项,让程序自动决定触发时机:

auto-aof-rewrite-min-size:如果 AOF 文件大小小于该值,则不会触发 AOF 重写。默认值为 64 MB;
auto-aof-rewrite-percentage:执行 AOF 重写时,当前 AOF 大小(aof_current_size)和上一次重写时 AOF 大小(aof_base_size)的比值。如果当前 AOF 文件大小增加了这个百分比值,将触发 AOF 重写。将此值设置为 0 将禁用自动 AOF 重写。默认值为 100

AOF重写机制优化改进可参考:《从 Redis7.0 发布看 Redis 的过去与未来》

6.7 AOF校验机制

AOF校验机制是Redis在启动时对AOF文件进行检查,判断文件是否完整,是否有损坏或者丢失的数据。通过使用校验和(checksum) 对整个 AOF 文件内容进行 CRC64 算法计算得出的数字来验证 AOF 文件。如果文件内容发生变化,那么校验和也会随之改变。因此,Redis 在启动时会比较计算出的校验和与文件末尾保存的校验和(计算的时候会把最后一行保存校验和的内容忽略),从而判断 AOF 文件是否完整。若发现文件有问题,Redis就会拒绝启动并提供相应的错误信息。AOF校验机制简单有效提高 Redis 数据的可靠性。

类似地,RDB 文件也有类似的校验机制来保证 RDB 文件的正确性。

6.8 如何选择 RDB 和 AOF
1)Redis保存的数据丢失一些也没什么影响的话,可以选择使用RDB。
(2)不建议单独使用AOF,因为随时创建一个RDB快照可以进行数据库备份、更快的重启以及解决AOF引擎错误。
(3)如果保存的数据要求安全性比较高的话,建议同时开启RDB和AOF两种持久化或者开启RDB和AOF混合持久化。

其实,了解计算机底层原理的都知道宕机数据丢失无法避免,要么是以性能很差的方式来解决。没有完美的方案只有均衡的方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/869370.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习pytorch多机多卡网络配置桥接方法

1 安装pdsh&#xff08;Parallel Distributed Shell&#xff09; sudo apt install pdsh sudo -s # 切换超级用户身份 …

MATLAB备赛资源库(1)建模指令

一、介绍 MATLAB&#xff08;Matrix Laboratory&#xff09;是一种强大的数值计算环境和编程语言&#xff0c;特别设计用于科学计算、数据分析和工程应用。 二、使用 数学建模使用MATLAB通常涉及以下几个方面&#xff1a; 1. **数据处理与预处理**&#xff1a; - 导入和处理…

Echarts实现github提交记录图

最近改个人博客&#xff0c;看了github的提交记录&#xff0c;是真觉得好看。可以移植到自己的博客上做文章统计 效果如下 代码如下 <!DOCTYPE html> <html lang"en" style"height: 100%"><head><meta charset"utf-8"> …

240709_昇思学习打卡-Day21-文本解码原理--以MindNLP为例

240709_昇思学习打卡-Day21-文本解码原理–以MindNLP为例 今天做根据前文预测下一个单词&#xff0c;仅作简单记录及注释。 一个文本序列的概率分布可以分解为每个词基于其上文的条件概率的乘积 &#x1d44a;_0:初始上下文单词序列&#x1d447;: 时间步当生成EOS标签时&a…

企业级网关设计

tips&#xff1a;本文完全来源于卢泽龙&#xff01;&#xff01;&#xff01; 一、Gateway概述 1.1设计目标 1.2gateway基本功能 中文文档参考&#xff1a;https://cloud.tencent.com/developer/article/1403887?from15425 三大核心&#xff1a; 二、引入依赖和yaml配置…

如何在 PostgreSQL 中确保数据的异地备份安全性?

文章目录 一、备份策略1. 全量备份与增量备份相结合2. 定义合理的备份周期3. 选择合适的备份时间 二、加密备份数据1. 使用 PostgreSQL 的内置加密功能2. 使用第三方加密工具 三、安全的传输方式1. SSH 隧道2. SFTP3. VPN 连接 四、异地存储的安全性1. 云存储服务2. 内部存储设…

人话学Python-基础篇-字符串

一&#xff1a;字符串的定义 在Python中使用引号来定义。不论是单引号还是双引号。 str1 Hello World str2 "Hello World" 二&#xff1a;字符串的访问 如果我们要取出字符串中单独的字符&#xff0c;需要使用方括号来表示取得的位置。如果要取出字符串的子串&…

原创作品—数据可视化大屏

设计数据可视化大屏时&#xff0c;用户体验方面需注重以下几点&#xff1a;首先&#xff0c;确保大屏信息层次分明&#xff0c;主要数据突出显示&#xff0c;次要信息适当弱化&#xff0c;帮助用户快速捕捉关键信息。其次&#xff0c;设计应直观易懂&#xff0c;避免复杂难懂的…

前端javascript中的排序算法之冒泡排序

冒泡排序&#xff08;Bubble Sort&#xff09;基本思想&#xff1a; 经过多次迭代&#xff0c;通过相邻元素之间的比较与交换&#xff0c;使值较小的元素逐步从后面移到前面&#xff0c;值较大的元素从前面移到后面。 大数据往上冒泡&#xff0c;小数据往下沉&#xff0c;也就是…

大语言模型垂直化训练技术与应用

在人工智能领域&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;已经成为推动技术进步的关键力量&#xff0c;垂直化训练技术逐渐成为研究的热点&#xff0c;它使得大模型能够更精准地服务于特定行业和应用场景。本文结合达观数据的分享&#xff0c…

tomcat 项目迁移,无法将项目作为服务service启动

背景 测试服务器需要迁移到正式服务器上&#xff0c;为了方便省事&#xff0c;将测试服务器上的一些文件直接复制到正式服务器 问题 使用startup启动项目之后&#xff0c;可以直接使用使用tomcat9w启动&#xff0c;或者作为服务service启动的时候&#xff0c;显示无法访问到资源…

AGE Cypher 查询格式

使用 ag_catalog 中的名为 cypher 的函数构建 Cypher 查询&#xff0c;该函数返回 Postgres 的记录集合。 Cypher() Cypher() 函数执行作为参数传递的 Cypher 查询。 语法&#xff1a;cypher(graph_name, query_string, parameters) 返回&#xff1a; A SETOF records 参…

自动驾驶事故频发,安全痛点在哪里?

大数据产业创新服务媒体 ——聚焦数据 改变商业 近日&#xff0c;武汉城市留言板上出现了多条关于萝卜快跑的投诉&#xff0c;多名市民反映萝卜快跑出现无故停在马路中间、高架上占最左道低速行驶、转弯卡着不动等情况&#xff0c;导致早晚高峰时段出现拥堵。萝卜快跑是百度 A…

YOLOv5、v7、v8如何修改检测框文字颜色和大小

YOLOv5和YOLOv8默认的标签文字颜色为白色&#xff0c;但是在亮度较大的图片中文字不明显&#xff0c;就需要对标签文字的颜色进行修改 一、YOLOv5 打开X:\Anaconda\envs\your-env\Lib\site-packages\ultralytics\utils\plotting.py X代表你的anaconda安装的盘&#xff0c;yo…

随笔(一)

1.即时通信软件原理&#xff08;发展&#xff09; 即时通信软件实现原理_即时通讯原理-CSDN博客 笔记&#xff1a; 2.泛洪算法&#xff1a; 算法介绍 | 泛洪算法&#xff08;Flood fill Algorithm&#xff09;-CSDN博客 漫水填充算法实现最常见有四邻域像素填充法&#xf…

最全windows提权总结(建议收藏)

当以低权用户进去一个陌生的windows机器后&#xff0c;无论是提权还是后续做什么&#xff0c;第一步肯定要尽可能的搜集信息。知己知彼&#xff0c;才百战不殆。 常规信息搜集 systeminfo 查询系统信息hostname 主机名net user 查看用户信息netstat -ano|find "3389&quo…

论文 | Chain-of-Thought Prompting Elicits Reasoningin Large Language Models 思维链

这篇论文研究了如何通过生成一系列中间推理步骤&#xff08;即思维链&#xff09;来显著提高大型语言模型进行复杂推理的能力。论文展示了一种简单的方法&#xff0c;称为思维链提示&#xff0c;通过在提示中提供几个思维链示例来自然地激发这种推理能力。 主要发现&#xff1…

SDIO CMD 数据部分 CRC 计算规则

使用的在线 crc 计算工具网址&#xff1a;http://www.ip33.com/crc.html CMD CRC7 计算 如下图为使用逻辑分析仪获取的SDIO读写SD卡时&#xff0c;CMD16指令发送的格式&#xff0c;通过逻辑分析仪总线分析&#xff0c;可以看到&#xff0c;该部分的CRC7校验值得0x05,大多数情况…

MySQL之基本查询(上)-表的增删查改

目录 Create(创建) 案例建表 插入 单行数据 指定列插入 单行数据 全列插入 多行数据 全列插入 插入是否更新 插入时更新 替换 Retrieve(读取) 建表插入 select列 全列查询 指定列查询 查询字段为表达式 为查询结果指定别名 结果去重 where条件 比较运算符 逻辑运…

昇腾APN最佳伙伴—英码科技AI算力计算产品亮相WAIC 2024

2024年7月4日-7日&#xff0c; “以共商促共享&#xff0c;以善治促善智”为主题的2024世界人工智能大会暨人工智能全球治理高级别会议&#xff08;WAIC&#xff09;在上海世博展览中心隆重举行。国务院总理李强出席开幕式并致辞。来自50多个国家和地区的1300位全球领军人物、展…