Redis学习笔记5:基于springboot的lettuce redis客户端断线重连ConnectionWatchdog

lettuce默认采用共享本地连接的模式和redis服务器端交互,如果连接断开如何及时发现并且重新建立连接呢?通过翻阅源码发现有两种方案,方案一:开启连接有效性检测;方案二:通过ConnectionWatchdog监视器

一个对springboot redis框架进行重写,支持lettuce、jedis、连接池、同时连接多个集群、多个redis数据库、开发自定义属性配置的开源SDK

<dependency><groupId>io.github.mingyang66</groupId><artifactId>emily-spring-boot-redis</artifactId><version>4.3.9</version>
</dependency>

GitHub地址:https://github.com/mingyang66/spring-parent

一、开启连接有效性validateConnection检测
  • LettuceConnectionFactory.SharedConnection#getConnection获取连接
		@NullableStatefulConnection<E, E> getConnection() {synchronized (this.connectionMonitor) {if (this.connection == null) {//建立本地连接this.connection = getNativeConnection();}//开启连接有效性检测if (getValidateConnection()) {//对获取到的连接进行ping有效性检测,如果无效则重置,重新建立连接validateConnection();}return this.connection;}}//连接有效性ping检查void validateConnection() {synchronized (this.connectionMonitor) {boolean valid = false;if (connection != null && connection.isOpen()) {try {if (connection instanceof StatefulRedisConnection) {//连接有效性ping检查((StatefulRedisConnection) connection).sync().ping();}if (connection instanceof StatefulRedisClusterConnection) {//连接有效性ping检查((StatefulRedisClusterConnection) connection).sync().ping();}valid = true;} catch (Exception e) {log.debug("Validation failed", e);}}//连接无效,则重置连接,并重新建立本地连接if (!valid) {log.info("Validation of shared connection failed; Creating a new connection.");resetConnection();this.connection = getNativeConnection();}}}
二、ConnectionWatchdog断线重连监视器通过RedisClient初始化
  • RedisClient#connectStatefulAsync方法开始对监视器初始化
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection, Endpoint endpoint,RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {ConnectionBuilder connectionBuilder;if (redisURI.isSsl()) {SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();sslConnectionBuilder.ssl(redisURI);connectionBuilder = sslConnectionBuilder;} else {connectionBuilder = ConnectionBuilder.connectionBuilder();}ConnectionState state = connection.getConnectionState();state.apply(redisURI);state.setDb(redisURI.getDatabase());connectionBuilder.connection(connection);connectionBuilder.clientOptions(getOptions());connectionBuilder.clientResources(getResources());connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, connection.getConnectionEvents(), redisURI);connectionBuilder.connectionInitializer(createHandshake(state));//监视器初始化入口ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);return future.thenApply(channelHandler -> (S) connection);}
  • io.lettuce.core.AbstractRedisClient#initializeChannelAsync添加异常监听
protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(ConnectionBuilder connectionBuilder) {Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress();if (clientResources.eventExecutorGroup().isShuttingDown()) {throw new IllegalStateException("Cannot connect, Event executor group is terminated.");}CompletableFuture<SocketAddress> socketAddressFuture = new CompletableFuture<>();CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();String uriString = connectionBuilder.getRedisURI().toString();EventRecorder.getInstance().record(new ConnectionCreatedEvent(uriString, connectionBuilder.endpoint().getId()));EventRecorder.RecordableEvent event = EventRecorder.getInstance().start(new ConnectEvent(uriString, connectionBuilder.endpoint().getId()));channelReadyFuture.whenComplete((channel, throwable) -> {event.record();});// 通过对tcp建立异常监听        socketAddressSupplier.doOnError(socketAddressFuture::completeExceptionally).doOnNext(socketAddressFuture::complete).subscribe(redisAddress -> {if (channelReadyFuture.isCancelled()) {return;}//初始化入口initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress);}, channelReadyFuture::completeExceptionally);return new DefaultConnectionFuture<>(socketAddressFuture,channelReadyFuture.thenApply(channel -> (T) connectionBuilder.connection()));}
  • io.lettuce.core.AbstractRedisClient#initializeChannelAsync0

  •   private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,SocketAddress redisAddress) {logger.debug("Connecting to Redis at {}", redisAddress);Bootstrap redisBootstrap = connectionBuilder.bootstrap();//初始化入口ChannelInitializer<Channel> initializer = connectionBuilder.build(redisAddress);redisBootstrap.handler(initializer);clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);channelReadyFuture.whenComplete((c, t) -> {if (t instanceof CancellationException) {connectFuture.cancel(true);}});connectFuture.addListener(future -> {Channel channel = connectFuture.channel();if (!future.isSuccess()) {Throwable cause = future.cause();Throwable detail = channel.attr(ConnectionBuilder.INIT_FAILURE).get();if (detail != null) {detail.addSuppressed(cause);cause = detail;}logger.debug("Connecting to Redis at {}: {}", redisAddress, cause);connectionBuilder.endpoint().initialState();channelReadyFuture.completeExceptionally(cause);return;}RedisHandshakeHandler handshakeHandler = channel.pipeline().get(RedisHandshakeHandler.class);if (handshakeHandler == null) {channelReadyFuture.completeExceptionally(new IllegalStateException("RedisHandshakeHandler not registered"));return;}handshakeHandler.channelInitialized().whenComplete((success, throwable) -> {if (throwable == null) {logger.debug("Connecting to Redis at {}: Success", redisAddress);RedisChannelHandler<?, ?> connection = connectionBuilder.connection();connection.registerCloseables(closeableResources, connection);channelReadyFuture.complete(channel);return;}logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);connectionBuilder.endpoint().initialState();channelReadyFuture.completeExceptionally(throwable);});});}
    • io.lettuce.core.ConnectionBuilder#build
        public ChannelInitializer<Channel> build(SocketAddress socketAddress) {//初始化入口return new PlainChannelInitializer(this::buildHandlers, clientResources);}
    
    • io.lettuce.core.ConnectionBuilder#buildHandlers
        protected List<ChannelHandler> buildHandlers() {LettuceAssert.assertState(channelGroup != null, "ChannelGroup must be set");LettuceAssert.assertState(connectionEvents != null, "ConnectionEvents must be set");LettuceAssert.assertState(connection != null, "Connection must be set");LettuceAssert.assertState(clientResources != null, "ClientResources must be set");LettuceAssert.assertState(endpoint != null, "Endpoint must be set");LettuceAssert.assertState(connectionInitializer != null, "ConnectionInitializer must be set");List<ChannelHandler> handlers = new ArrayList<>();connection.setOptions(clientOptions);handlers.add(new ChannelGroupListener(channelGroup, clientResources.eventBus()));handlers.add(new CommandEncoder());handlers.add(getHandshakeHandler());handlers.add(commandHandlerSupplier.get());handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus()));//如果自动重连机制打开if (clientOptions.isAutoReconnect()) {//新建连接监视器handlers.add(createConnectionWatchdog());}return handlers;}
    
    • ConnectionBuilder#createConnectionWatchdog创建连接监视器
        protected ConnectionWatchdog createConnectionWatchdog() {//如果连接监视器已经存在,则直接返回if (connectionWatchdog != null) {return connectionWatchdog;}LettuceAssert.assertState(bootstrap != null, "Bootstrap must be set for autoReconnect=true");LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set for autoReconnect=true");//创建连接监视器ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,connection, clientResources.eventBus(), endpoint);//将连接监视器注入到DefaultEndpoint对象中endpoint.registerConnectionWatchdog(watchdog);connectionWatchdog = watchdog;return watchdog;}
    

    到这里我们已经成功的将监视器对象ConnectionWatchdog在本地连接建立的时候注入到了DefaultEndpoint对象之中,DefaultEndpoint会开启对非活跃连接的监视。

    三、DefaultEndpoint#notifyChannelActive方法启用ConnectionWatchdog监听断开连接事件
    @Overridepublic void notifyChannelActive(Channel channel) {this.logPrefix = null;this.channel = channel;this.connectionError = null;if (isClosed()) {logger.info("{} Closing channel because endpoint is already closed", logPrefix());channel.close();return;}//监视器对象不为空,则启用监视器if (connectionWatchdog != null) {connectionWatchdog.arm();}...});}
    
  • ConnectionWatchdog#arm方法开启断连事件监听器

    void arm() {//标记启用ConnectionWatchdog监视器this.armed = true;//设置标记在通道不活跃(连接断开)时监听事件setListenOnChannelInactive(true);}
  • ConnectionWatchdog#channelInactive方法在连接的状态从活跃变为不活跃时会被调用,通常标识连接已经断开
    @Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.debug("{} channelInactive()", logPrefix());if (!armed) {logger.debug("{} ConnectionWatchdog not armed", logPrefix());return;}channel = null;if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {//开启重连接调度scheduleReconnect();} else {logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);}super.channelInactive(ctx);}
  • ConnectionWatchdog#scheduleReconnect方法表示如果通道不可用或已经断开连接,则进行重连接
    public void scheduleReconnect() {logger.debug("{} scheduleReconnect()", logPrefix());if (!isEventLoopGroupActive()) {logger.debug("isEventLoopGroupActive() == false");return;}if (!isListenOnChannelInactive()) {logger.debug("Skip reconnect scheduling, listener disabled");return;}if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {//重试次数attempts++;final int attempt = attempts;//延迟时间,根据具体策略来生成等待时间,最大等待30sDuration delay = reconnectDelay.createDelay(attempt);int timeout = (int) delay.toMillis();logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);this.reconnectScheduleTimeout = timer.newTimeout(it -> {reconnectScheduleTimeout = null;if (!isEventLoopGroupActive()) {logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");return;}reconnectWorkers.submit(() -> {//开启连接重建ConnectionWatchdog.this.run(attempt, delay);return null;});}, timeout, TimeUnit.MILLISECONDS);// Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.if (!reconnectSchedulerSync.get()) {reconnectScheduleTimeout = null;}} else {logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());}}

重试等待延迟时间规则如下:

第1次:PT0.001S
第2次:PT0.002S
第3次:PT0.004S
第4次:PT0.008S
第5次:PT0.016S
第6次:PT0.032S
第7次:PT0.064S
第8次:PT0.128S
第9次:PT0.256S
第10次:PT0.512S
第11次:PT1.024S
第12次:PT2.048S
第13次:PT4.096S
第14次:PT8.192S
第15次:PT16.384S
第16次:PT30S
第17次:PT30S
第18次:PT30S
第19次:PT30S
  • ConnectionWatchdog#run连接重建
private void run(int attempt, Duration delay) throws Exception {reconnectSchedulerSync.set(false);reconnectScheduleTimeout = null;if (!isEventLoopGroupActive()) {logger.debug("isEventLoopGroupActive() == false");return;}if (!isListenOnChannelInactive()) {logger.debug("Skip reconnect scheduling, listener disabled");return;}if (isReconnectSuspended()) {logger.debug("Skip reconnect scheduling, reconnect is suspended");return;}boolean shouldLog = shouldLog();InternalLogLevel infoLevel = InternalLogLevel.INFO;InternalLogLevel warnLevel = InternalLogLevel.WARN;if (shouldLog) {lastReconnectionLogging = System.currentTimeMillis();} else {warnLevel = InternalLogLevel.DEBUG;infoLevel = InternalLogLevel.DEBUG;}InternalLogLevel warnLevelToUse = warnLevel;try {reconnectionListener.onReconnectAttempt(new ConnectionEvents.Reconnect(attempt));eventBus.publish(new ReconnectAttemptEvent(redisUri, epid, LocalAddress.ANY, remoteAddress, attempt, delay));logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);//真正建立连接ReconnectionHandlerTuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> tuple = reconnectionHandler.reconnect();CompletableFuture<Channel> future = tuple.getT1();future.whenComplete((c, t) -> {if (c != null && t == null) {return;}CompletableFuture<SocketAddress> remoteAddressFuture = tuple.getT2();SocketAddress remote = remoteAddress;if (remoteAddressFuture.isDone() && !remoteAddressFuture.isCompletedExceptionally()&& !remoteAddressFuture.isCancelled()) {remote = remoteAddressFuture.join();}String message = String.format("Cannot reconnect to [%s]: %s", remote,t.getMessage() != null ? t.getMessage() : t.toString());if (ReconnectionHandler.isExecutionException(t)) {if (logger.isDebugEnabled()) {logger.debug(message, t);} else {logger.log(warnLevelToUse, message);}} else {logger.log(warnLevelToUse, message, t);}//发送重连接失败事件eventBus.publish(new ReconnectFailedEvent(redisUri, epid, LocalAddress.ANY, remote, t, attempt));if (!isReconnectSuspended()) {scheduleReconnect();}});} catch (Exception e) {logger.log(warnLevel, "Cannot reconnect: {}", e.toString());//发送重连接失败事件eventBus.publish(new ReconnectFailedEvent(redisUri, epid, LocalAddress.ANY, remoteAddress, e, attempt));}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/121314.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

为什么网上的流量卡都有禁发地区呢?流量卡管控地区整理!

在网上购买过流量卡的朋友应该都知道&#xff0c;但凡是运营商推出的大流量优惠套餐&#xff0c;在套餐详情中都是有禁发地区&#xff0c;只不过每张卡的禁发地区不同而已。 设置禁发地区的主要目的还是为了防止一些电信诈骗案件的发生&#xff0c;或者违法违规利用电话卡的情…

C语言数据结构之数据结构入门

目录 数据结构介绍 数据结构发展史 何为算法 数据结构基础 基本概念和术语 四大逻辑结构&#xff08;Logic Structure&#xff09; 数据类型 理解复杂度概念 时间空间复杂度定义 度量时间复杂度的方法 程序运行时的内存与地址 编程预备 数据结构介绍 数据结构发展…

CVPR2023新作:基于组合空时位移的视频修复

Title: A Simple Baseline for Video Restoration With Grouped Spatial-Temporal Shift (视频修复的简单基准&#xff1a;组合空时位移) Affiliation: CUHK MMLab (香港中文大学多媒体实验室) Authors: Dasong Li, Xiaoyu Shi, Yi Zhang, Ka Chun Cheung, Simon See, Xiaoga…

苏宁一面复盘

技术问题&#xff08;顺序有错乱&#xff09;&#xff1a; 1.抽象类与接口&#xff0c;什么时候用抽象类&#xff0c;什么时候用接口&#xff0c;区别是什么。 答&#xff1a;抽象类内可以有非抽象方法&#xff0c;方法的具体实现&#xff0c;接口内只能有抽象方法&#xff0…

python:多波段遥感影像分离成单波段影像

作者:CSDN @ _养乐多_ 在遥感图像处理中,我们经常需要将多波段遥感影像拆分成多个单波段图像,以便进行各种分析和后续处理。本篇博客将介绍一个用Python编写的程序,该程序可以读取多波段遥感影像,将其拆分为单波段图像,并保存为单独的文件。本程序使用GDAL库来处理遥感影…

一个简单的注册页面,如有错误请指正(2.css)

这段CSS代码定义了页面的样式&#xff0c;让我逐个解释其功能&#xff1a; 1. * {}&#xff1a;通配符选择器&#xff0c;用于将页面中的所有元素设置统一的样式。这里将margins和paddings设置为0&#xff0c;以去除默认的边距。 2. div img {}&#xff1a;选择页面中所有div…

LMFLOSS:专治解决不平衡医学图像分类的新型混合损失函数 (附代码)

论文地址&#xff1a;https://arxiv.org/pdf/2212.12741.pdf 代码地址&#xff1a;https://github.com/SanaNazari/LMFLoss 1.是什么&#xff1f; LMFLOSS是一种用于不平衡医学图像分类的混合损失函数。它是由Focal Loss和LDAM Loss的线性组合构成的&#xff0c;旨在更好地处…

c语言 判断两个文件是否相同

使用strcmp比较&#xff1a; #include <stdio.h> #include <string.h>int Compare(const char * file1, const char* file2) {FILE* f1, * f2;int size1, size2;unsigned char buffer1[1024], buffer2[1024];f1 fopen(file1, "rb");f2 fopen(file2, &…

SpringSecurity 认证实战

一. 项目数据准备 1.1 添加依赖 <dependencies><!--spring security--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependency><!--web起步依赖-…

语雀故障事件——P0级别事故启示录 发生肾么事了? 怎么回事?

前言 最近&#xff0c;阿里系的语雀出了一个大瓜&#xff0c;知名在线文档编辑与协同工具语雀发生故障&#xff0c;崩溃近10小时。。。。最后&#xff0c;官方发布了一则公告&#xff0c;我们一起来看看这篇公告&#xff0c;能不能有所启发。 目录 前言引出一、语雀P0故障回顾…

Centos 7 安装 Docker Enginee

文章目录 Centos 安装 Docker Enginee系统要求卸载旧版本使用 RPM 仓库设置 Docker 仓库安装 Docker Enginee升级 Docker Enginee 卸载 Docker Centos 安装 Docker Enginee 要在 Centos 安装 Docker Enginee&#xff0c;需要满足以下要求&#xff1a; 系统要求 CentOS 7Cent…

重复控制器的性能优化

前言 重复控制器在控制系统中是比较优秀的控制器&#xff0c;在整流逆变等周期性输入信号时&#xff0c;会有很好的跟随行&#xff0c;通常可以单独使用&#xff0c;也可以与其他补偿器串联并联使用。 这里我来分析一下重复控制器的重复控制器的应用工况以及其的优缺点。 分析…

Mybatis-Plus(企业实际开发应用)

一、Mybatis-Plus简介 MyBatis-Plus是MyBatis框架的一个增强工具&#xff0c;可以简化持久层代码开发MyBatis-Plus&#xff08;简称 MP&#xff09;是一个 MyBatis 的增强工具&#xff0c;在 MyBatis 的基础上只做增强不做改变&#xff0c;为简化开发、提高效率而生。 官网&a…

Python深度学习实战-基于class类搭建BP神经网络实现分类任务(附源码和实现效果)

实现功能 上篇文章介绍了用Squential搭建BP神经网络&#xff0c;Squential可以搭建出上层输出就是下层输入的顺序神经网络结构&#xff0c;无法搭出一些带有跳连的非顺序网络结构&#xff0c;这个时候我们可以选择类class搭建封装神经网络结构。 第一步&#xff1a;import ten…

基于情感词典的情感分析方法

计算用户情绪强弱性&#xff0c;对于每一个文本都可以得到一个情感分值&#xff0c;以情感分值的正负性表示情感极性&#xff0c;大于0为积极情绪&#xff0c;小于0反之&#xff0c;绝对值越大情绪越强烈。 基于情感词典的情感分析方法主要思路&#xff1a; 1、对文本进行分词…

影响光源的因素

影响光源的因素 对比度 1.对比度 均匀性 2.均匀性 色彩还原性 3.色彩还原性 其他因素&#xff1a; 4. 亮度 &#xff1a; 光源 亮度是光源选择时的重要参考&#xff0c;尽量选择亮度高的光源。 5. 鲁棒性 &#xff1a; 鲁棒性是指光源是否对部件的位置敏感度最小 。 6. 光…

不同设备的请求头信息UserAgent,Headers

一、电脑端 【设备名称】&#xff1a;电脑 Win10 【应用名称】&#xff1a;win10 Edge 【浏览器信息】&#xff1a;名称:(Chrome)&#xff1b;版本:(70.0) 【请求头信息】&#xff1a;Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Ch…

easyExcel按模板填充数据,处理模板列表合并问题等,并导出为html,pdf,png等格式文件demo

1.工具类 /*** excle模板填充并转换html* * @param response 返回* @param order 主体内容* @param goods 配件列表* @param pro 项目列表* @throws IOException*/public static void moudleExcleToHtml(HttpServletResponse response, String moudleUrl, Object o…

点云从入门到精通技术详解100篇-基于尺度统一的三维激光点云与高清影像配准(续)

目录 以三维激光点云为基准的影像点云尺度估计 3.1 基于 Scale ratio ICP 的尺度估计 3.1.1 Spin image 介绍