logback AbstractLogstashTcpSocketAppender 源码解析

大家好,我是烤鸭:

今天分享下 logback 源码 ,版本是 6.5-SNAPSHOT。

  1. 写这篇的目的

    由于最近项目中一直出现这个日志,而且基本每20秒就会打印一次,也没法关掉,百度上资料也很少,只能自己来了。

    10:04:01,393 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxxx.com:1111: Waiting 19999ms before attempting reconnection.

    正常来说,这个提示就是简单提示下,socket连接断开,可能是网络或者是服务端的原因,然后重连。比如下边这个日志。

    11:17:06,337 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[LOGSTASH] - Log destination xxx.com:1234: Waiting 27476ms before attempting reconnection.
    11:17:13,302 |-WARN in net.logstash.logback.appender.LogstashAccessTcpSocketAppender[logstash] - Log destination xxx.com:1234: connection failed. java.net.ConnectException: Connection refused: connectat java.net.ConnectException: Connection refused: connectat  at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)at  at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)at  at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)at  at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)at  at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)at  at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)at  at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)at  at java.net.Socket.connect(Socket.java:606)at  at net.logstash.logback.appender.AbstractLogstashTcpSocketAppender$TcpSendingEventHandler.openSocket(AbstractLogstashTcpSocketAppender.java:721)at  at net.logstash.logback.appender.AbstractLogstashTcpSocketAppender$TcpSendingEventHandler.onStart(AbstractLogstashTcpSocketAppender.java:640)at  at net.logstash.logback.appender.AsyncDisruptorAppender$EventClearingEventHandler.onStart(AsyncDisruptorAppender.java:351)at  at com.xxx.arch.encoder.com.lmax.disruptor.BatchEventProcessor.notifyStart(BatchEventProcessor.java:224)at  at com.xxx.arch.encoder.com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:120)at  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at  at java.util.concurrent.FutureTask.run(FutureTask.java:266)at  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at  at java.lang.Thread.run(Thread.java:748)
    11:17:13,303 |-WARN in net.logstash.logback.appender.LogstashAccessTcpSocketAppender[logstash] - Log destination xxx.com:1234: Waiting 27662ms before attempting reconnection.

    异常的日志,连接成功后,每10s断开连接,然后过20s重试后连接成功,一直反复,乐此不疲...

    11:48:54,484 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: connection established.
    11:49:04,524 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: Waiting 19949ms before attempting reconnection.
    11:49:24,477 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: connection established.
    11:49:34,478 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: Waiting 19995ms before attempting reconnection.
  2. 源码:AbstractLogstashTcpSocketAppender(嫌麻烦的直接看3)

    由于 这个用到 com.lmax.disruptor 这个包,推荐看一下美团的这篇 https://tech.meituan.com/2016/11/18/disruptor.html

    AbstractLogstashTcpSocketAppender ,一般是用于发送日志内容,比如将日志内容发送到 logstash/flume 等。

    具体的配置可以参考下 https://www.cnblogs.com/zhyg/p/6994314.html

    内部类

    TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, LifecycleAware

    负责执行TCP传输的事件处理器,这个类内部还有3个线程内部类 分别是

    KeepAliveRunnable(用于和socet 保持连接)、ReaderCallable(接收socket的流信息)、WriteTimeoutRunnable(检测写入超时,如果超时了就关闭连接)

    UnconnectedConfigurableSSLSocketFactory extends ConfigurableSSLSocketFactory (创建链接,使用自定义配置参数)

    TcpSendingEventHandler 重点看下这个类,处理tcp事务都处理些啥,方法如下:

    onEvent ,对 EventHandler.onEvent 的实现,有事件就去处理。代码不长,而且注释特别清晰。

    接受到事件后循环5次,判断socket的读取流的线程是否结束或者socket是否为空,调用 reopenSocket 方法,否则调用下面的writeEvent。如果 readerFuture.isDone() 是服务器关闭了连接,如果是 socket为空,是写入超时。

    if (readerFuture.isDone() || socket == null) {/** If readerFuture.isDone(), then the destination has shut down its output (our input),* and the destination is probably no longer listening to its input (our output).* This will be the case for Amazon's Elastic Load Balancers (ELB)* when an instance behind the ELB becomes unhealthy while we're connected to it.** If socket == null here, it means that a write timed out,* and the socket was closed by the WriteTimeoutRunnable.* * Therefore, attempt reconnection.*/addInfo(peerId + "destination terminated the connection. Reconnecting.");reopenSocket();try {readerFuture.get();sendFailureException = NOT_CONNECTED_EXCEPTION;} catch (Exception e) {sendFailureException = e;}continue;
    }

    writeEvent,tcp 往服务器写数据。由于keepalive 也会触发事件,但是event 为null。所以这时候判断是 keepalive还是其他事件。

    其他事件的写入还要 兼容 logbakck1.x版本,keepalive 写入的话,写入 换行符。还有个属性 endOfBatch,如果是的话,会执行 outputStream.flush()

    onStart , 启动方法。

    初始化 destinationAttemptStartTimes 数组,目的是为了存放每个连接目标最后尝试连接的时间。openSocket(建立 socket连接),scheduleKeepAlive (定时线程 触发keepAlive 事件),scheduleWriteTimeout(定时检查写超时的话,就关闭连接(这个在5.x是没有的方法))

    onShutdown 就不说了

    reopenSocket 调了两个方法,关闭连接,打开连接。

    openSocket ,是被 synchronized 修饰的。方法注释说的是反复打开socket,直到线程被打断了。

    /*** Repeatedly tries to open a socket until it is successful,* or the hander is stopped, or the handler thread is interrupted.** If the socket is non-null when this method returns,* then it should be able to be used to send.*/

    方法比较长,一点点看

    while (isStarted() && !Thread.currentThread().isInterrupted()) {// 获取下一个连接的index,多个链接地址的时候多个<destination>标签,默认主从,还有轮询获取和随机destinationIndex = connectionStrategy.selectNextDestinationIndex(destinationIndex, destinations.size());long startWallTime = System.currentTimeMillis();Socket tempSocket = null;OutputStream tempOutputStream = null;/** Choose next server*/InetSocketAddress currentDestination = destinations.get(destinationIndex);try {/** Update peerId (for status message)*/peerId = "Log destination " + currentDestination + ": ";/** Delay the connection attempt if the last attempt to the selected destination* was less than the reconnectionDelay.* 判断最后一次尝试连接的时间和延迟重连比较,如果上一次重试的时间小于30s,会提示并在30减去重试时间后,发起重连*/final long millisSinceLastAttempt = startWallTime - destinationAttemptStartTimes[destinationIndex];if (millisSinceLastAttempt < reconnectionDelay.getMilliseconds()) {final long sleepTime = reconnectionDelay.getMilliseconds() - millisSinceLastAttempt;if (errorCount < MAX_REPEAT_CONNECTION_ERROR_LOG * destinations.size()) {addWarn(peerId + "Waiting " + sleepTime + "ms before attempting reconnection.");}try {shutdownLatch.await(sleepTime, TimeUnit.MILLISECONDS);if (!isStarted()) {return;}} catch (InterruptedException ie) {Thread.currentThread().interrupt();addWarn(peerId + "connection interrupted. Will no longer attempt reconnection.");return;}// reset the start time to be after the wait period.startWallTime = System.currentTimeMillis();}// 更新当前index的最后重试时间destinationAttemptStartTimes[destinationIndex] = startWallTime;/** Set the SO_TIMEOUT so that SSL handshakes will timeout if they take too long.** Note that SO_TIMEOUT only applies to reads (which occur during the handshake process).*/tempSocket = socketFactory.createSocket();tempSocket.setSoTimeout(acceptConnectionTimeout);/** currentDestination is unresolved, so a new InetSocketAddress* must be created to resolve the hostname.*/tempSocket.connect(new InetSocketAddress(getHostString(currentDestination), currentDestination.getPort()), acceptConnectionTimeout);/** Trigger SSL handshake immediately and declare the socket unconnected if it fails*/if (tempSocket instanceof SSLSocket) {((SSLSocket)tempSocket).startHandshake();}/** Issue #218, make buffering the output stream optional.*/tempOutputStream = writeBufferSize > 0? new BufferedOutputStream(tempSocket.getOutputStream(), writeBufferSize): tempSocket.getOutputStream();if (getLogback11Support().isLogback11OrBefore()) {getLogback11Support().init(encoder, tempOutputStream);}addInfo(peerId + "connection established.");this.socket = tempSocket;this.outputStream = tempOutputStream;boolean shouldUpdateThreadName = (destinationIndex != connectedDestinationIndex);connectedDestinationIndex = destinationIndex;connectedDestination = currentDestination;connectionStrategy.connectSuccess(startWallTime, destinationIndex, destinations.size());if (shouldUpdateThreadName) {/** destination has changed, so update the thread name*/updateCurrentThreadName();}// 默认的schedule线程池,每10s触发一次,读取server的返回this.readerFuture = scheduleReaderCallable(new ReaderCallable(tempSocket.getInputStream()));fireConnectionOpened(this.socket);return;} catch (Exception e) {CloseUtil.closeQuietly(tempOutputStream);CloseUtil.closeQuietly(tempSocket);connectionStrategy.connectFailed(startWallTime, destinationIndex, destinations.size());fireConnectionFailed(currentDestination, e);/** Avoid spamming status messages by checking the MAX_REPEAT_CONNECTION_ERROR_LOG.*/if (errorCount++ < MAX_REPEAT_CONNECTION_ERROR_LOG * destinations.size()) {addWarn(peerId + "connection failed.", e);}}
    }

    scheduleKeepAlive 维持连接的,需要在xml中配置 <keepAliveDuration>,默认不触发这个方法

    scheduleWriteTimeout 监测写入超时的

    ReaderCallable.call 读取服务器的流,没有返回空。但是! 触发定时线程池往 Disruptor 中触发一个空事件。

    其实作者也说了触发空事件就是为了 keepalive,触发的时候会判断 future是否结束,结束的话重新打开socket。

    如果没有这个方法,会在下次触发 onEvent时重新连接,所以为了尽快打开socket,作者加了这个折中的方案。

    @Override
    public Void call() throws Exception {updateCurrentThreadName();try {while (true) {try {if (inputStream.read() == -1) {/** End of stream reached, so we're done.*/return null;}} catch (SocketTimeoutException e) {/** ignore, and try again*/} catch (Exception e) {/** Something else bad happened, so we're done.*/throw e;}}} finally {if (!Thread.currentThread().isInterrupted()) {getExecutorService().submit(() -> {/** https://github.com/logstash/logstash-logback-encoder/issues/341** Pro-actively trigger the event handler's onEvent method in the handler thread* by publishing a null event (which usually means a keepAlive event).** When onEvent handles the event in the handler thread,* it will detect that readerFuture.isDone() and reopen the socket.** Without this, onEvent would not be called until the next event,* which might not occur for a while.* So, this is really just an optimization to reopen the socket as soon as possible.** We can't reopen the socket from this thread,* since all socket open/close must be done from the event handler thread.** There is a potential race condition here as well, since* onEvent could be triggered before the readerFuture completes.* We reduce (but not eliminate) the chance of that happening by* scheduling this task on the executorService.*/getDisruptor().getRingBuffer().tryPublishEvent(getEventTranslator(), null);});}}
    }

    其实看完这块代码,我的问题就破案了。

    每隔10s发起重连是来源于这个地方触发的空事件,也是正常的。期间很有可能是服务器断开了连接,之后发起了重连。

  3. 对上面的流程梳理下

    启动的时候:创建连接、定时心跳维护连接(默认关闭)、定时监测写入超时(默认100ms)

    创建连接:看上次尝试连接时间是否超过30s,没超过的话,等待20s后重连,超过的话立即重连。定时10s的单个线程读取socket的输入流,读取完毕后触发 Disruptor 一个空事件。

    触发事件的时候:循环5次,判断当前的连接状态(线程状态和socket状态),关闭:调用关闭连接和创建连接。开启:调用写入方法。

    写入方法:判断是心跳维护还是正常事件,心跳维护写换行符,正常事件写入事件值。如果是批量终结,调用 flush ,刷新流。

  4. 解决方案

    虽然问题找到了,由于猜测是服务端释放连接导致的这个问题,所以没什么好的解决方案,粗暴一点,直接改了 logback-encoder 的日志级别,改为ERROR,看不到WARN日志了,有点骗自己的意思...

    当我写完整个的时候发现了真正的问题所在... logstash-logback-encoder 版本问题,改成5.3 可以了。

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/412604.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PostgreSQL 、springboot 、spring data jpa 集成

项目地址&#xff1a;https://gitee.com/zhxs_code/PostgreSQL_springboot_jpa_demo.git 增删查改都已经实现。 重点部分&#xff1a; 1.定义自己的方言。 1 package com.zxl.postgrespringdemo.config.dialect;2 3 import org.hibernate.dialect.PostgreSQL94Dialect;4 import…

[css] 有用过Flex吗?简要说下你对它的了解

[css] 有用过Flex吗&#xff1f;简要说下你对它的了解 给我感触最深的只有两点方便/好用子元素超出的bug关于bug, 大家有什么好的解决方案, 除了overflow: hidden个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大…

RedisTemplate value序列化导致的问题

大家好&#xff0c;我是烤鸭&#xff1a; ​ 今天分享一个redisTemplate 使用时&#xff0c;value 序列化的问题。 1. 问题描述 其实我最开始遇到的问题是&#xff1a; stringRedisTemplate.opsForSet().isMember(key,value)一直返回false问题&#xff0c;下边博客给出了…

41、OrthoMCL和mcl软件进行基因家族分析

转载&#xff1a;http://www.realbio.cn/news/124.html https://blog.csdn.net/seallama/article/details/43820763 http://www.cnblogs.com/huangying78/p/8638506.html 1. 数据库的配置 OrthoMCL的分析需要先行建立mysql账户并建立相应的数据库。关于mysql用户的创建我们不在…

[css] 如何实现换肤功能?

[css] 如何实现换肤功能&#xff1f; css 换肤常见方案 是通过 less/sass/postcss 等css 预处理器&#xff0c;通过它们自身的变量用法&#xff0c;设置不同变量&#xff0c;生成不同的主题样式&#xff0c;但是这些样式都是会被打包成常量&#xff0c;我们只能在编译之前修改…

beego——模板处理

beego的模板处理引擎采用的是Go内置的html/template包进行处理&#xff0c;而且beego的模板处理逻辑是采用了缓存编译方式&#xff0c; 也就是所有的模板会在beego应用启动的时候全部编译然后缓存在map里面。 1.模板目录 beego中默认的模板目录是views&#xff0c;用户可以把模…

日志 中文乱码、nacos 中文乱码、saltstack 中文乱码、docker中文乱码

大家好&#xff0c;我是烤鸭&#xff1a; ​ 今天分享一个 saltstack 中文乱码 的问题。 问题说明 由于项目之前没有接入公司的发布系统&#xff0c;今天接入之后发现日志乱码&#xff0c;不仅如此&#xff0c;从nacos获取到的中文参数也是乱码。于是猜想是发布系统遗留了一…

[css] 列举CSS优化、提高性能的方法

[css] 列举CSS优化、提高性能的方法 加载性能压缩CSS通过link方式加载&#xff0c;而不是import复合属性其实分开写&#xff0c;执行效率更高&#xff0c;因为CSS最终也还是要去解析如 margin-left: left;选择器性能尽量少的使用嵌套&#xff0c;可以采用BEM的方式来解决命名冲…

[css] 假如设计稿使用了非标准的字体,你该如何去实现它?

[css] 假如设计稿使用了非标准的字体&#xff0c;你该如何去实现它&#xff1f; 协商解决, 如果是重要信息, 如logo等, 使用图片, iconfont.个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 …

redis出现过多command 慢查询slowlog出现command命令

大家好&#xff0c;我是烤鸭&#xff1a; 今天分享一个问题&#xff0c;一个关于redis slowlog&#xff0c;执行过多 command命令的问题。 问题来源 所有走redis 的接口tp99和平均耗时是原来的两倍不止&#xff0c;运维说redis 的qps也翻倍了。查了下slowlog&#xff0c;发现…

[学习笔记]上下界网络流

有的时候&#xff0c;网络流建模要考虑某些边必须选择若干次&#xff0c;又不能多于若干次&#xff0c;而且不太容易转化成比较好的限制模型&#xff0c; 就简单粗暴地给每条边定一个流量的上下界&#xff0c;求在满足上下界的基础上的一些问题。 大概有以下几种。 基本思路都是…

[css] 你知道全屏滚动的原理是什么吗?它用到了CSS的哪些属性?

[css] 你知道全屏滚动的原理是什么吗&#xff1f;它用到了CSS的哪些属性&#xff1f; 全屏滚动和轮播图类似&#xff0c;都是通过改变元素位置或者显示与隐藏来实现&#xff0c;配合JS的一些交互距离判断&#xff0c;实现类似原生滚动捕获的效果。这里全屏的话就需要将宽高都设…

springcloud gateway 使用nacos 动态过滤器 记一次线上网关升级cpu升高的问题

大家好&#xff0c;我是烤鸭&#xff1a; ​ 网关升级&#xff0c;想使用 springcloud gateway nacos 动态过滤器配置(原来是硬编码的方式)&#xff0c;升级之后出了一些问题(cpu升高&#xff0c;ygc频繁)&#xff0c;记录一下。 关于 springcloud gateway 集成 nacos 可以看…

[css] 你是怎样抽离样式模块的?

[css] 你是怎样抽离样式模块的&#xff1f; 说的是 webpack extract-text-webpack-plugin插件吧&#xff1f; 把样式文件单独打包出来。 webpack4 升级了插件为 mini-css-extract-plugin个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c;…

【1】生产者-消费者模型的三种实现方式

(手写生产者消费者模型&#xff0c;写BlockingQueue较简便 ) 1、背景 生产者生产数据到缓冲区中&#xff0c;消费者从缓冲区中取数据。 如果缓冲区已经满了&#xff0c;则生产者线程阻塞&#xff1b; 如果…

springboot mybatis-plus 配置 yml 、druid 配置 yml 、mybatis-plus 代码生成

大家好&#xff0c;我是烤鸭&#xff1a; 今天分享一下 springboot mybatis-plus 和 druid 的yml 配置文件。 pom <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency…

[css] 说说你对媒体查询的理解

[css] 说说你对媒体查询的理解 当年做响应式布局的时候用过媒介查询&#xff0c;media query。包括现在有的时候为了兼容也会用到一些&#xff0c;查找对应范围使用不同的样式个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定…

Spring Boot 2.1 版本变化[翻译]

大家好&#xff0c;我是烤鸭&#xff1a; ​ 最近在把低版本的springboot项目升级&#xff0c;正好翻译了下springboot 2.1-2.3 版本的更新日志。 ​ Github 原文&#xff1a;https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-2.1-Release-Notes ​ 2.2 版…

实验吧Web-易-天网管理系统(php弱类型,==号)

打开网页&#xff0c;查看源码&#xff0c;看到 <!-- $test$_GET[username]; $testmd5($test); if($test0) --> 说明用户名需要加密之后为0。 对于PHP的号&#xff0c;在使用 运算符对两个字符串进行松散比较时&#xff0c;PHP会把类数值的字符串转换为数值进行比较&…

[css] 你知道的等高布局有多少种?写出来

[css] 你知道的等高布局有多少种&#xff1f;写出来 flex拉伸display: flex; align-items: stretch;padding margin抵消 然后background-clip默认是border-box所以会在被抵消的位置依然显示背景 造成等高假象.box,.box2{float: left;width: 100px; } .box {background: #cccccc…