Flink日志采集-ELK可视化实现

一、各组件版本

组件版本
Flink1.16.1
kafka2.0.0
Logstash6.5.4
Elasticseach6.3.1
Kibana6.3.1

  针对按照⽇志⽂件⼤⼩滚动⽣成⽂件的⽅式,可能因为某个错误的问题,需要看好多个⽇志⽂件,还有Flink on Yarn模式提交Flink任务,在任务执行完毕或者任务报错后container会被回收从而导致日志丢失,为了方便排查问题可以把⽇志⽂件通过KafkaAppender写⼊到kafka中,然后通过ELK等进⾏⽇志搜索甚⾄是分析告警。

二、Flink配置将日志写入Kafka

2.1 flink-conf.yaml增加下面两行配置信息

env.java.opts.taskmanager: -DyarnContainerId=$CONTAINER_ID
env.java.opts.jobmanager: -DyarnContainerId=$CONTAINER_ID

2.2 log4j.properties配置案例如下

##################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
##################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30# This affects logging for both user code and Flink
#rootLogger.appenderRef.file.ref = MainAppender
rootLogger.level = INFO
rootLogger.appenderRef.kafka.ref = Kafka
rootLogger.appenderRef.file.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO# Log all infos in the given file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 500MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10#appender.main.name = MainAppender
#appender.main.type = RollingFile
#appender.main.append = true
#appender.main.fileName = ${sys:log.file}
#appender.main.filePattern = ${sys:log.file}.%i
#appender.main.layout.type = PatternLayout
#appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
#appender.main.policies.type = Policies
#appender.main.policies.size.type = SizeBasedTriggeringPolicy
#appender.main.policies.size.size = 100MB
#appender.main.policies.startup.type = OnStartupTriggeringPolicy
#appender.main.strategy.type = DefaultRolloverStrategy
#appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}# kafka
appender.kafka.type = Kafka
appender.kafka.name = Kafka
appender.kafka.syncSend = true
appender.kafka.ignoreExceptions = false
appender.kafka.topic = flink_logs
appender.kafka.property.type = Property
appender.kafka.property.name = bootstrap.servers
appender.kafka.property.value = xxx1:9092,xxx2:9092,xxx3:9092
appender.kafka.layout.type = JSONLayout
apender.kafka.layout.value = net.logstash.log4j.JSONEventLayoutV1
appender.kafka.layout.compact = true
appender.kafka.layout.complete = false# Suppress the irrelevant (wrong) warnings from the Netty channel handler
#logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF#通过 flink on yarn 模式还可以添加⾃定义字段
# 日志路径
appender.kafka.layout.additionalField1.type = KeyValuePair
appender.kafka.layout.additionalField1.key = logdir
appender.kafka.layout.additionalField1.value = ${sys:log.file}
# flink-job-name
appender.kafka.layout.additionalField2.type = KeyValuePair
appender.kafka.layout.additionalField2.key = flinkJobName
appender.kafka.layout.additionalField2.value = ${sys:flinkJobName}
# 提交到yarn的containerId
appender.kafka.layout.additionalField3.type = KeyValuePair
appender.kafka.layout.additionalField3.key = yarnContainerId
appender.kafka.layout.additionalField3.value = ${sys:yarnContainerId}

  上⾯的 appender.kafka.layout.type 可以使⽤ JSONLayout ,也可以⾃定义。

  ⾃定义需要将上⾯的appender.kafka.layout.type 和 appender.kafka.layout.value 修改成如下:

appender.kafka.layout.type = PatternLayout
appender.kafka.layout.pattern ={"log_level":"%p","log_timestamp":"%d{ISO8601}","log_thread":"%t","log_file":"%F","l
og_line":"%L","log_message":"'%m'","log_path":"%X{log_path}","job_name":"${sys:flink
_job_name}"}%n

2.3 基于Flink on yarn模式提交任务前期准备

2.3.1 需要根据kafka的版本在flink/lib⽬录下放⼊kafka-clients的jar包

在这里插入图片描述

2.3.2 kafka处于启动状态

2.3.3 Flink Standalone集群

# 根据kafka的版本放⼊kafka-clients
kafka-clients-3.1.0.jar
# jackson对应的jar包
jackson-annotations-2.13.3.jar
jackson-core-2.13.3.jar
jackson-databind-2.13.3.jar

2.4 Flink on yarn任务提交案例

/root/software/flink-1.16.1/bin/flink run-application \
-t yarn-application \
-D yarn.application.name=TopSpeedWindowing \
-D parallelism.default=3 \
-D jobmanager.memory.process.size=2g \
-D taskmanager.memory.process.size=2g \
-D env.java.opts="-DflinkJobName=TopSpeedWindowing" \
/root/software/flink-1.16.1/examples/streaming/TopSpeedWindowing.jar

【注意】启动脚本需要加入这个参数,日志才能采集到任务名称(-D env.java.opts="-DflinkJobName=xxx")

  消费flink_logs案例

{instant: {epochSecond: 1698723428,nanoOfSecond: 544000000,},thread: 'flink-akka.actor.default-dispatcher-17',level: 'INFO',loggerName: 'org.apache.flink.runtime.rpc.akka.AkkaRpcService',message: 'Stopped Akka RPC service.',endOfBatch: false,loggerFqcn: 'org.apache.logging.slf4j.Log4jLogger',threadId: 68,threadPriority: 5,logdir: '/yarn/container-logs/application_1697779774806_0046/container_1697779774806_0046_01_000002/taskmanager.log',flinkJobName: 'flink-log-collect-test',yarnContainerId: 'container_1697779774806_0046_01_000002',
}

  ⽇志写⼊Kafka之后可以通过Logstash接⼊elasticsearch,然后通过kibana进⾏查询或搜索

三、LogStash部署

  部署过程略,网上都有

  需要注意Logstash内部kafka-clients和Kafka版本兼容问题,需要根据Kafka版本选择合适的Logstash版本

  将以下内容写⼊config/logstash-sample.conf ⽂件中

input {kafka {bootstrap_servers => ["xxx1:9092,xxx2:9092,xxx3:9092"] group_id => "logstash-group"topics => ["flink_logs"] consumer_threads => 3 type => "flink-logs" codec => "json"auto_offset_reset => "latest"}
}output {elasticsearch {hosts => ["192.168.1.249:9200"] index => "flink-log-%{+YYYY-MM-dd}"}
}

  Logstash启动:

logstash-6.5.4/bin/logstash -f logstash-6.5.4/config/logstash-sample.conf 2>&1 >logstash-6.5.4/logs/logstash.log &

四、Elasticsearch部署

  部署过程略,网上都有

  注意需要用root用户以外的用户启动Elasticsearch

  启动脚本:

Su elasticsearchlogtestelasticsearch-6.3.1/bin/elasticsearch

在这里插入图片描述

  Windows访问ES客户端推荐使用ElasticHD,本地运行后可以直连ES
在这里插入图片描述

五、Kibana部署

  部署过程略,网上都有

  启动脚本:

  kibana-6.3.1-linux-x86_64/bin/kibana

5.1 配置规则

在这里插入图片描述
在这里插入图片描述

5.2 日志分析

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/129827.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3使用ts封装axios

以下是使用 TypeScript 封装 Axios 的示例代码&#xff1a; //axios.tsimport axios, { AxiosInstance, AxiosRequestConfig, AxiosResponse, AxiosError } from axios;// 自定义 API 响应类型 interface ApiResponse<T> {code: number;message: string;data: T; }// 自…

中文大语言模型汇总

推荐一篇非常棒的github&#xff1a;Awesome-Chinese-LLM 另附语言模型排行榜&#xff1a;FastChat 里面总结了几乎所有目前主流的中文大语言模型。在此记录一下&#xff0c;方便以后慢慢学习。

单例模式下双重校验锁 DCL 的灵魂三问

文章目录 前言如何实现一个双重校验锁 DCL定义一个单例变量定义一个获取单例的方法性能优化性能优化带来的一点点问题什么是指令重排&#xff1f; 总结如何理解文章开篇理解的三个问题1、为什么需要使用两个 if 语句&#xff1f;2、为什么使用了 synchronized 关键字还需要使用…

AI 引擎系列 4 - 首次运行 AI 引擎编译器和 x86simulator(2022.1 更新)

AI 引擎系列 4 - 首次运行 AI 引擎编译器和 x86simulator&#xff08;2022.1 更新&#xff09; 简介 在 AI 引擎系列的前 3 篇博文中&#xff0c;我们探讨了 AI 引擎应用所需的不同文件。在本篇中&#xff0c;我们将为 X86 目标运行 AI 引擎编译器&#xff0c;观察它生成的不…

二进制搭建 Kubernetes(K8s)

目录 1、环境 1.1 操作系统初始化配置 1.2 部署 docker引擎 1.3 部署 etcd 集群 1.4 准备签发证书环境 1.5 部署 Master 组件 1.6 部署 Worker Node 组件 1.7 部署 CNI 网络组件 1.7.1 部署 flannel 1.7.2 部署 Calico 1.7.3 node02 节点部署 1.7.4 部署 CoreDNS 1…

0-1背包 完全背包 + 至多/恰好/至少 + 空间优化 + 常见变形题(实战力扣题)

&#xff08;一&#xff09;01背包 1.回溯三问 # capacity:背包容量 # w[i]: 第 i 个物品的体积 # v[i]: 第 i 个物品的价值 # 返回:所选物品体积和不超过 capacity 的前提下&#xff0c;所能得到的最大价值和 def zero_one_knapsack(capacity:int,w:List[int],v:List[int])…

【Linux】第八站:gcc和g++的使用

文章目录 一、解决sudo命令的问题二、Linux编译器-gcc/g1.gcc的使用2.g的使用 三、gcc编译链接过程1.预处理2.编译&#xff08;生成汇编&#xff09;3.汇编&#xff08;生成机器可识别代码&#xff09;4.链接&#xff08;生成可执行文件或库文件&#xff09;5.一些选项的意义 四…

决策树算法的实现

决策树是一种机器学习算法&#xff0c;它类似于人脑思考问题的过程。我们可以通过问一系列的问题来逐步缩小答案的范围&#xff0c;最终得到最终的答案。 比如说&#xff0c;我们想要预测一个人是否会购买某个产品&#xff0c;我们可以通过一系列的问题来缩小预测范围。例如&a…

Docker容器技术实战3

8、docker原生网络 Docker原生网络基于Linux桥接技术和虚拟网络接口&#xff0c;使用了Linux内核的网络功能。每个Docker容器都有自己的网络命名空间&#xff0c;这使得容器之间可以使用独立的IP地址&#xff0c;并隔离了容器的网络栈。 当创建一个Docker原生网络时&#xff…

Airtest工具根据App页面文字信息提取坐标进行截图保存在自定义文件夹

Airtest工具根据App页面文字信息提取坐标进行截图保存在自定义文件夹 一、项目背景 在一个项目中&#xff0c;选项被选中和未选中的节点元素的属性值无变化&#xff0c;通过AI识别率达不到百分百&#xff0c;想着通过计算图片的HSV值来判断选择能否被选中。&#xff08;HSV比…

ESP32 for Arduino 分区信息

忘记过去&#xff0c;超越自己 ❤️ 博客主页 单片机菜鸟哥&#xff0c;一个野生非专业硬件IOT爱好者 ❤️❤️ 本篇创建记录 2023-11-04❤️❤️ 本篇更新记录 2023-11-04❤️&#x1f389; 欢迎关注 &#x1f50e;点赞 &#x1f44d;收藏 ⭐️留言&#x1f4dd;&#x1f64f;…

【JAVA学习笔记】59 - JUnit框架使用、本章作业

项目代码 https://github.com/yinhai1114/Java_Learning_Code/tree/main/IDEA_Chapter15/src/com/yinhai/homework JUnit测试框架 1.基本介绍 1. JUnit是一个Java语言的单元测试框架 2.多数Java的开发环境都已经集成了JUnit作为单元测试的工具 2.如何使用 创建方法后&#x…

QtC++与QVariant详解

什么是 QVariant&#xff1f; QVariant 是 Qt 中的一个类&#xff0c;允许您以一种通用的方式存储和访问数据&#xff0c;即使这些数据具有不同的数据类型。它在许多情况下非常有用&#xff0c;例如在模型视图编程、数据序列化、设置和配置管理等领域。QVariant 可以包含以下类…

JVM离线分析-使用MAT分析dump堆文件

1. MAT&#xff08;Memory Analyzer Tool&#xff09;的介绍 官方介绍 The Eclipse Memory Analyzer is a fast and feature-rich Java heap analyzer that helps you find memory leaks and reduce memory consumption. Use the Memory Analyzer to analyze productive heap …

Java随机获取某个范围内的随机整数

随机获取某个范围内的随机整数 一、代码 /*** 随机获取某个范围内的随机整数的值* param min 最小值* param max 最大值* return*/public static int randomNum(int min,int max) {// 创建一个Random对象Random random new Random();// 生成指定范围内的随机整数int randomI…

通过@ConfigrationProperties读取配置文件属性并赋值

这种设计使得 Anything 成为 Something 类的静态成员&#xff0c;因此不依赖于外部类的实例。静态内部类通常更独立&#xff0c;它们可以单独存在且无需引用外部类的实例。 如果将 Anything 类定义为非静态的内部类&#xff0c;它将依赖于 Something 类的实例。这意味着每个 S…

牛客周赛 Round 17

A、 题目描述&#xff1a; 游游拿到了一个边长为n的正方形披萨&#xff0c;她准备切k刀&#xff08;每刀只能横着或竖着切开&#xff09;&#xff0c;最终会形成若干个小矩形披萨。游游希望每个小披萨的面积相等&#xff0c;且矩形的长和宽的差的绝对值尽可能小。你能帮游游求出…

vue中的rules表单校验规则使用方法 :rules=“rules“

一、el-form里面必写属性值 :ref"dataForm" // 提交表单时进行校验 :rules"rules" // return 下的校验规则 :model"userForm" // 绑定表单的值 <el-formref"dataForm" // 必写属性值:rules"rules"…

CV论文阅读大合集

YearNameAreamodeldescriptiondrawback2021 ICMLClip &#xff08;Contrastive Language-Image Pre-training&#xff09;contrastive learning、zero-shot learing、mutimodel用文本作为监督信号来训练可迁移的视觉模型CLIP’s zero-shot performance, although comparable to…

Eureka处理流程

1、Eureka Server服务端会做什么 1、服务注册 Client服务提供者可以向Server注册服务&#xff0c;并且内部有二层缓存机制来维护整个注册表&#xff0c;注册表是Eureka Client的服务提供者注册进来的。 2、提供注册表 服务消费者用来获取注册表 3、同步状态 通过注册、心跳机制…