您是否曾经对运行了几个小时的Spark作业感到沮丧,但由于基础设施问题而失败了。
您会很晚才知道此故障,并浪费了数小时的时间,当Spark UI日志也无法用于事后检查时,它会更加痛苦。
你不是一个人!
在这篇文章中,我将介绍如何启用与Spark logger搭配使用的自定义记录器。
该定制记录器将收集从被动监视到主动监视所需的所有信息。
无需为此设置额外的日志记录。
Spark 2.X基于Slf4j抽象,并且使用了logback绑定。
让我们从基本的日志记录开始,即如何在Spark作业或应用程序中获取记录器实例。
val _LOG = LoggerFactory.getLogger(this.getClass.getName)
就是这么简单,现在您的应用程序使用的是与Spark基于相同的日志库和设置。
现在要做一些更有意义的事情,我们必须注入自定义记录器,该记录器将收集信息并将其写入弹性搜索或发布到某些REST端点或发送警报。
让我们一步一步去做
构建自定义日志附加程序
由于spark 2.X是基于logback的,因此我们必须编写logback logger。
自定义登录记录器的代码段
class MetricsLogbackAppender extends UnsynchronizedAppenderBase[ILoggingEvent] {override def append(e: ILoggingEvent) = {//Send this message to elastic search or REST end pointmessageCount.compute(Thread.currentThread().getName, mergeValue)System.out.println(messageCount + " " + e)}val messageCount = new ConcurrentHashMap[String, AtomicInteger]()val mergeValue = new BiFunction[String, AtomicInteger, AtomicInteger] {def apply(key: String, currentValue: AtomicInteger) = {val nextValue = currentValue match {case null => new AtomicInteger(0)case _ => currentValue}nextValue.incrementAndGet()nextValue}}}
这是一个非常简单的记录器,它按线程统计消息,您要做的所有事情都将覆盖附加函数。
这种类型的记录器可以执行任何操作,例如写入数据库或发送到REST端点或发出警报。
启用记录器
要使用新的记录器,请创建logback.xml文件并为新的记录器添加条目。
该文件可以打包在Shaded jar中,也可以指定为运行时参数。
样本logback.xml
<configuration><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><!-- encoders are assigned the typech.qos.logback.classic.encoder.PatternLayoutEncoder by default --><encoder><pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern></encoder></appender><appender name="METRICS" class="micro.logback.MetricsLogbackAppender"/><root level="info"><appender-ref ref="STDOUT" /></root><logger level="info" name="micro" additivity="true"><appender-ref ref="METRICS" /></logger><logger level="info" name="org.apache.spark.scheduler.DAGScheduler" additivity="true"><appender-ref ref="METRICS" /></logger></configuration>
此配置文件将MetricsLogbackAppender添加为METRICS
<appender name="METRICS" class="micro.logback.MetricsLogbackAppender"/>
接下来为应使用此功能的包/类启用它
<logger level="info" name="micro" additivity="true"> <appender-ref ref="METRICS" /></logger>
<logger level="info" name="org.apache.spark.scheduler.DAGScheduler" additivity="true"> <appender-ref ref="METRICS" /></logger
大功告成!
从'micro'包或DAGScheduler类记录的任何消息都将使用new logger。
使用这种技术,执行者日志也可以被捕获,当Spark作业在成百上千的执行者上运行时,这变得非常有用。
现在,它提供了许多让BI实时显示所有这些消息的选项,允许团队提出一些有趣的问题或在情况不佳时订阅变更。
警告:请确保此新记录器减慢了应用程序的执行速度,建议使其异步。
在正确的时间获取见解并将其付诸实践
此博客中使用的代码在github中的@sparkmicroservices回购中可用。
我有兴趣知道您正在为Spark使用哪种日志记录模式。
翻译自: https://www.javacodegeeks.com/2018/05/custom-logs-in-apache-spark.html