目录
前言
Lettuce
什么是事件总线EventBus?
Connected
Connection activated
Disconnected
Connection deactivated
Reconnect failed
使用
一种另类方法—AOP
具体实现
前言
在上一篇深入浅出,SpringBoot整合Quartz实现定时任务与Redis健康检测(二)_往事如烟隔多年的博客-CSDN博客
文章中,通过在SpringBoot中使用Quartz实现了Redis健康状态的检测,虽然一定程度上解决了Redis的连接问题,但仍存在一些问题,比如当Redis在定时检测的间隔时间内断开连接时,此时有用户请求进入时仍然会出现服务不可用的状态,那么有办法能够在Redis断开时通知到SpringBoot程序,进而实现Redis到MySQL的切换呢?本文将在之前的基础上使用Redis的事件总线机制来解决该问题。
Lettuce
在SpringBoot 2.x版本中默认使用的Redis客户端为Lettuce,与早期的Jedis不同,Lettuce底层由Netty构建,异步IO驱动提升效率,支持高并发,且线程安全。由下图可以看出项目中引入了Redis依赖后使用的客户端为Lettuce。
什么是事件总线EventBus?
虽然引入了Lettuce,但是面临的问题应该如何作以解决呢?这里就得讲一下Redis的EventBus了,可以通过Lettuce官方文档了解相关信息,
Connection Events · lettuce-io/lettuce-core Wiki · GitHub
由于客户端使用了lettuce6.1.10,因此我们关注内容如下Connection Events · lettuce-io/lettuce-core Wiki · GitHub
不难看出自lettuce提供了一个用于使用连接事件的实例代码,通过订阅事件总线可以监听到当前Redis的连接事件状态,连接事件状态分为如下五种:
Connected
传输协议连接建立,即TCP三次握手完成,Socket通信建立,事件类型ConnectedEvent。
Connection activated
逻辑连接建立,当前处于活动状态并且可以开始分发Redis命令,此时ping命令已经发送并收到回应,可以正常使用Redis命令。事件类型ConnectionActivatedEvent。
Disconnected
连接关闭,传输协议关闭或重置。该事件发生在定期关闭和连接中断时,事件类型Disconnected。
Connection deactivated
连接停用,逻辑连接停用,内部的处理状态被重置,并且isOpen()标志被设置为false,该事件发生在定期关闭和连接中断时,事件类型Connection deactivated。
Reconnect failed
重连失败(自5.3版起),尝试重连失败。包含重连失败和重连计数器。事件类型RecconectFailedEvent。
如上五种类型中需要重点关注的是Connection activated,当该连接事件发生时,将Redis连接标识置为true即可。
使用
根据前文提到的官方文档给出的代码,使用后会发现并未监听到Redis的连接状态。
RedisClient client = RedisClient.create()
EventBus eventBus = client.getresources().eventBus();eventBus.get().subscribe(e -> System.out.println(event));...
client.shutdown();
此时进入create()方法内部查看情况,可以看到默认无参的构造函数创建了一个RedisClient对象,其中第一个参数为clientResources对象,第二个参数为一个字符串,即空的URI,想必在这里已经猜到为什么无法监听连接了。
由于并没有传入对应的Redis资源或URI路径,导致无法进行正常的事件监听。
在源码中不难发现有其它同名含参的构造方法, 此处使用第三个含有ClientResource对象的方法,因为可以通过Spring注入该对象,从而省去手工构造的繁琐步骤。
具体代码如下
通过创建事件总线对象,进而订阅Redis的连接事件,当Reids处于非连接状态时会实时修改RedisCheckConfig的连接状态标识。即使在Redis定时检测任务的间隔中也能保证服务的正常运行。
@Component
@Slf4j
public class RedisStatusListener {@Autowiredprivate ClientResources clientResources;public RedisClient redisClient() {RedisClient client = RedisClient.create(clientResources);EventBus eventBus = client.getResources().eventBus();eventBus.get().subscribe(e -> {RedisCheckConfig.redisConnected = e instanceof ConnectionActivatedEvent ? true : false;log.info("EventBus获取Redis是否连接 "+RedisCheckConfig.redisConnected);});return client;}
}
一种另类方法——AOP
如果并不想使用事件总线的方式来解决,这里再提供一种其它的思路。可以通过AOP的方式来完成判断。
通过对Redis操作类中的所有操作指令进行检测,即当Redis操作命令执行超过1s(可调节)时则认为此时Redis出现了连接异常。
这里由于需要统计执行时间需要用到线程池,通过AOP的环绕通知来实现业务逻辑拦截,即当Redis命令操作超过指定时间则返回数据为其对应存储数据的默认类型。
具体实现
@Component
@Aspect
@Slf4j
public class RedisOperationAspect {@Around("execution(* com.o2o.shop.util.RedisOperator.*(..))")public Object monitorRedisCommandExecution(ProceedingJoinPoint joinPoint) throws Throwable {// 获取方法信息MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();Method method = methodSignature.getMethod();String methodName = method.getName();// 执行超时时间,单位mslong timeout = 1000;ExecutorService executor = new ThreadPoolExecutor(1, 1,1000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1));// 创建Future对象Future<Object> future = executor.submit(() -> {try {return joinPoint.proceed();} catch (Throwable throwable) {log.error("Redis AOP 执行 "+joinPoint+"异常");throw new RuntimeException(throwable);}});// 执行方法并等待结果,设置超时时间try {return future.get(timeout, TimeUnit.MILLISECONDS);} catch (TimeoutException e) {// 超时处理log.warn("Redis command execution timed out");// 中断方法执行future.cancel(true);// 返回特定结果Class<?> returnType = method.getReturnType();if (returnType.isPrimitive()) {if (returnType == boolean.class) {return false;} else if (returnType == char.class) {return '\0';} else if (returnType == byte.class || returnType == short.class ||returnType == int.class || returnType == long.class) {return 0;} else if (returnType == float.class || returnType == double.class) {return 0.0f;}}return null;} finally {// 关闭线程池executor.shutdown();}}}