ELK 企业级日志分析系统(四)

ELK

  • 一、部署Kafka集群
  • 二、Kafka的命令行操作
  • 三、Kafka架构深入
  • 四、Filebeat+Kafka+ELK部署

一、部署Kafka集群

1.下载安装包
官方下载地址:http://kafka.apache.org/downloads.html

cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz

2.安装 Kafka
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka

//修改配置文件
cd /usr/local/kafka/config/
cp server.properties{,.bak}

vim server.properties
broker.id=0 ●21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.80.10:9092 ●31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
num.network.threads=3 #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.io.threads=8 #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
socket.send.buffer.bytes=102400 #48行,发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400 #51行,接收套接字的缓冲区大小
socket.request.max.bytes=104857600 #54行,请求套接字的缓冲区大小
log.dirs=/usr/local/kafka/logs #60行,kafka运行日志存放的路径,也是数据存放的路径
num.partitions=1 #65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1 #69行,用来恢复和清理data下数据的线程数量
log.retention.hours=168 #103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
log.segment.bytes=1073741824 #110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
zookeeper.connect=192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 ●123行,配置连接Zookeeper集群地址

//修改环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH= P A T H : PATH: PATH:KAFKA_HOME/bin

source /etc/profile

//配置 Zookeeper 启动脚本
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME=‘/usr/local/kafka’
case $1 in
start)
echo “---------- Kafka 启动 ------------”
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo “---------- Kafka 停止 ------------”
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
0 s t a r t ; ; s t a t u s ) e c h o " − − − − − − − − − − K a f k a 状态 − − − − − − − − − − − − " c o u n t = 0 start ;; status) echo "---------- Kafka 状态 ------------" count= 0start;;status)echo"Kafka状态"count=(ps -ef | grep kafka | egrep -cv “grep|$$”)
if [ “$count” -eq 0 ];then
echo “kafka is not running”
else
echo “kafka is running”
fi
;;
*)
echo “Usage: $0 {start|stop|restart|status}”
esac

//设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka

//分别启动 Kafka
service kafka start

二、Kafka的命令行操作

3.Kafka 命令行操作
//创建topic
kafka-topics.sh --create --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --replication-factor 2 --partitions 3 --topic test
在这里插入图片描述

–zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
–replication-factor:定义分区副本数,1 代表单副本,建议为 2
–partitions:定义分区数
–topic:定义 topic 名称

//查看当前服务器中的所有 topic
kafka-topics.sh --list --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181
在这里插入图片描述
//查看某个 topic 的详情
kafka-topics.sh --describe --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181
在这里插入图片描述
//发布消息
kafka-console-producer.sh --broker-list 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test
在这里插入图片描述
//消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test --from-beginning
在这里插入图片描述
–from-beginning:会把主题中以往所有的数据都读取出来

//修改分区数
kafka-topics.sh --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --alter --topic test --partitions 6
在这里插入图片描述
在这里插入图片描述
这里的修改只能是增加,不能减少,减少只能删除这个topic

//删除 topic
kafka-topics.sh --delete --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --topic test
在这里插入图片描述

三、Kafka架构深入

//Kafka 工作流程及文件存储机制
Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。

topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。 消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。

index 和 log 文件以当前 segment 的第一条消息的 offset 命名。

“.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

//数据可靠性保证
为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后, 都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

//数据一致性问题
LEO:指的是每个副本最大的 offset;
HW:指的是消费者能见到的最大的 offset,所有副本中最小的 LEO。

(1)follower 故障
follower 发生故障后会被临时踢出 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合),待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

(2)leader 故障
leader 发生故障之后,会从 ISR 中选出一个新的 leader, 之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。

注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

//ack 应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡选择。

当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:
●0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。当broker故障时有可能丢失数据。

●1(默认配置):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果在follower同步成功之前leader故障,那么将会丢失数据。

●-1(或者是all):producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是如果在 follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复。

三种机制性能依次递减,数据可靠性依次递增。

注:在 0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。在 0.11 及以后版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。

四、Filebeat+Kafka+ELK部署

1.部署 Zookeeper+Kafka 集群

2.部署 Filebeat
cd /usr/local/filebeat

vim filebeat.yml
filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/httpd/access_log
tags: [“access”]

- type: log
enabled: true
paths:
- /var/log/httpd/error_log
tags: [“error”]
在这里插入图片描述


#添加输出到 Kafka 的配置
output.kafka:
enabled: true
hosts: [“192.168.80.10:9092”,“192.168.80.11:9092”,“192.168.80.12:9092”] #指定 Kafka 集群配置
topic: “httpd” #指定 Kafka 的 topic
在这里插入图片描述

#启动 filebeat
./filebeat -e -c filebeat.yml

3.部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件
cd /etc/logstash/conf.d/

vim kafka.conf
input {
kafka {
bootstrap_servers => “192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092” #kafka集群地址
topics => “httpd” #拉取的kafka的指定topic
type => “httpd_kafka” #指定 type 字段
codec => “json” #解析json格式的日志数据
auto_offset_reset => “latest” #拉取最近数据,earliest为从头开始拉取
decorate_events => true #传递给elasticsearch的数据额外增加kafka的属性数据
}
}

output {
if “access” in [tags] {
elasticsearch {
hosts => [“192.168.80.30:9200”]
index => “httpd_access-%{+YYYY.MM.dd}”
}
}

if “error” in [tags] {
elasticsearch {
hosts => [“192.168.80.30:9200”]
index => “httpd_error-%{+YYYY.MM.dd}”
}
}

stdout { codec => rubydebug }
}

#启动 logstash
logstash -f kafka.conf

4.浏览器访问 http://192.168.80.30:5601 登录 Kibana,单击“Create Index Pattern”按钮添加索引“filebeat_test-*”,单击 “create” 按钮创建,单击 “Discover” 按钮可查看图表信息及日志信息。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1314.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

铰接式车辆的横向动力学仿真提供车辆模型研究(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

【关于C++中----特殊类设计和单例模式】

文章目录 一、设计一个类,不能被拷贝1.1C98的实现方法及其弊端1.2 C11的实现方法 二、设计一个类,只能在堆上创建对象三、设计一个类,只能在栈上创建对象四、设计一个类,不能被继承五、设计一个类,只能创建一个对象(单…

Leecode316: 去除重复字母

下面这里使用有序map——TreeMap来实现Map接口,但是相对顺序是不能改变的!这样会使得后面的跑到前面去,所以有问题 最简单的思想肯定是暴力思想,就是从前往后寻找,一旦遇到存在的情况就. 重点在于明确两点&#xff1a…

【Docker】了解和使用Docker

文章底部有投票活动,赶快参与进来吧😃 相信大家在开发过程中都听说过 Docker 一词,至于 Docker 在开发中扮演的角色,估计好多人都说不上来,今天就让阿Q带大家一起揭开它神秘的面纱! 文章目录 什么是容器&a…

dolphinscheduler伪分布式安装

1、上传安装包 2、安装 #解压 重命名 [rootdatacollection conf]# cd /opt/modules/ [rootdatacollection modules]# tar -zxf apache-dolphinscheduler-2.0.6-bin.tar.gz -C /opt/installs/ [rootdatacollection modules]# cd ../installs/ [rootdatacollection installs]# m…

【广州华锐互动】智慧交通3D可视化交互平台

智慧交通3D可视化交互平台由广州华锐互动开发,是一种基于现代科技的智能交通管理系统,它能够实现对车站内部人员和车辆的实时监控和管理。该平台采用了先进的三维可视化技术,将车站内部的结构和设备以立体、直观的方式呈现在用户面前&#xf…

LangChain大型语言模型(LLM)应用开发(四):QA over Documents

LangChain是一个基于大语言模型(如ChatGPT)用于构建端到端语言模型应用的 Python 框架。它提供了一套工具、组件和接口,可简化创建由大型语言模型 (LLM) 和聊天模型提供支持的应用程序的过程。LangChain 可以轻松管理与语言模型的交互&#x…

springboot整合ehcache和redis实现多级缓存实战案例

一、概述 在实际的工作中,我们通常会使用多级缓存机制,将本地缓存和分布式缓存结合起来,从而提高系统性能和响应速度。本文通过springboot整合ehcache和redis实现多级缓存案例实战,从源码角度分析下多级缓存实现原理。 二、实战案…

赛效:如何将PDF文件免费转换成Word文档

1:在网页上打开wdashi,默认进入PDF转Word页面,点击中间的上传文件图标。 2:将PDF文件添加上去之后,点击右下角的“开始转换”。 3:稍等片刻转换成功后,点击绿色的“立即下载”按钮,将…

做私域选个微还是企微,哪个有优势?

做私域,你必须要有一个,引流新客户及留存老客户的地方。 于是,就有很多人讨论或者纠结:做私域,选择个人微信?还是企业微信? 让我们一起来看看个人微信和企业微信在功能和使用上有哪些区别&…

[SpringBoot]单点登录

关于单点登录 单点登录的基本实现思想: 当客户端提交登录请求时,服务器端在验证登录成功后,将生成此用户对应的JWT数据,并响应到客户端 客户端在后续的访问中,将自行携带JWT数据发起请求,通常&#xff0c…

一篇搞懂steam/csgo搬砖原理

接触csgo游戏搬砖项目三年了,也有在别的论坛交流心得。让我无语的是有些已经游戏搬砖差不多半年,却还告诉我没有赚到钱,又或者说时常到可出售的时候利润少的可怕,总是说这个行业说水太深了!那么请你告诉我,…

快快快快快快快快快快排

作者简介:დ旧言~,目前大一,现在学习Java,c,Python等 座右铭:松树千年终是朽,槿花一日自为荣。 望小伙伴们点赞👍收藏✨加关注哟💕💕 C语言实现快排☺️ ℹ️…

Ceph 块存储系统 RBD 接口

-创建 Ceph 块存储系统 RBD 接口- 1、创建一个名为 rbd-demo 的专门用于 RBD 的存储池 ceph osd pool create rbd-demo 64 642、将存储池转换为 RBD 模式 ceph osd pool application enable rbd-demo rbd3、初始化存储池 rbd pool init -p rbd-demo # -p 等同于 --pool4、…

jenkins手把手教你从入门到放弃01-jenkins简介(详解)

一、简介 jenkins是一个可扩展的持续集成引擎。持续集成,也就是通常所说的CI(Continues Integration),可以说是现代软件技术开发的基础。持续集成是一种软件开发实践, 即团队开发成员经常集成他们的工作,通…

STM32 Proteus仿真LCD12864火灾检测烟雾火焰温度报警器MQ2 -0064

STM32 Proteus仿真LCD12864火灾检测烟雾火焰温度报警器MQ2 -0064 Proteus仿真小实验: STM32 Proteus仿真LCD12864火灾检测烟雾火焰温度报警器MQ2 -0064 功能: 硬件组成:STM32F103R6单片机 LCD12864 液晶显示DS18B20 温度传感器多个按键电位…

单例模式:懒汉式和饿汉式

目录 懒汉模式和饿汉模式 区别 示例 懒汉模式线程不安全 懒汉模式线程安全 懒汉模式内部静态变量线程安全 饿汉式线程安全 指的是在系统生命周期内,只产生一个实例。 懒汉模式和饿汉模式 分为懒汉式和饿汉式 区别 创建时机和线程安全 线程安全&#xff1…

高时空分辨率、高精度一体化预测技术的风、光、水自动化预测技术的应用

第一章 预测平台讲解及安装 一、高精度气象预测基础理论介绍 综合气象观测数值模拟模式; 全球预测模式、中尺度数值模式; 二、自动化预测平台介绍 Linux系统 Crontab定时任务执行机制 Bash脚本自动化编程 硬件需求简介 软件系统安装 …

分享一个加载按钮动画

先看效果&#xff1a; 再看代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>加载动画按钮</title><script src"https://cdnjs.cloudflare.com/ajax/libs/animejs/3.2…

flutter开发实战-卡片翻转动画效果Transform+IndexedStack+rotateAnimation

flutter开发实战-实现卡片翻转动画效果 之前开发中遇到了商品卡片翻转&#xff0c;商品正面是商品图片、商品名称&#xff1b;背面是商品价格&#xff0c;需要做卡片翻转动画。 动画实现即&#xff1a;在一段时间内&#xff0c;快速地多次改变UI外观&#xff1b;由于人眼会产生…