实战--------部署搭建ELFK+zookeeper+kafka架构

目录

一、部署jdk环境

二、搭建Elasticsearch

三、搭建logstash

四、搭建kibana服务

五、搭建filebeat服务

六、搭建zookeeper与kafka服务

七、部署ELFK+zookeeper+kafka


  1. Filebeat/Fluentd:负责从各服务器节点上实时收集日志数据,Filebeat轻量级,适合大规模部署,Fluentd功能强大,支持丰富的插件和灵活的过滤规则。

  2. Kafka:作为一个分布式消息队列系统,承担起数据缓冲和中转的角色。日志数据先发送到Kafka集群,一方面可以缓解Logstash或Filebeat的压力,另一方面支持多消费者模型,允许数据被多个下游系统并行消费。此外,Kafka的高吞吐量和持久化特性使得系统在面临大量日志输入时仍能保持稳定。

  3. Logstash:从Kafka集群中消费日志数据,进行必要的数据解析、过滤、转换等预处理操作,然后将结构化后的数据发送到Elasticsearch。

  4. ZooKeeper:在某些场景下,ZooKeeper可以用于管理Kafka集群的元数据,例如Broker注册、Topic的分区分配等,确保Kafka集群的稳定性和一致性。同时,对于Logstash或Kafka Connect这类组件,也可以通过ZooKeeper获取集群配置信息。

  5. Elasticsearch:存储经过处理的日志数据,提供全文搜索、聚合分析等功能,便于后期进行日志分析和故障排查。

  6. Kibana:作为前端展示工具,基于Elasticsearch的数据创建可视化图表和仪表盘,为用户提供友好的日志分析界面。

综合起来,ELFK+Zookeeper+Kafka架构结合了日志收集、处理、存储和分析的全链条,极大地提高了日志管理的效率和用户体验。

在ELFK+Zookeeper+Kafka架构中,日志数据的流转路径通常是这样的: 应用程序日志 -> Filebeat -> Kafka -> (Logstash ->) Elasticsearch -> Kibana

环境准备

IP地址主机名安装服务
192.168.83.30node1

JDK-1.18.0

Elasticsearch-6.6.1

192.168.83.40node2

JDK-1.18.0

Elasticsearch-6.6.1

192.168.83.50logstash

JDK-1.18.0

logstash-6.6.1

httpd

192.168.83.60kibana

JDK-1.18.0

kibana-6.6.1

192.168.83.70filebeat

JDK-1.18.0

filebeat-6.6.1

192.168.83.80zk-ka1

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

192.168.83.90zk-ka2

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

192.168.83.100zk-ka3

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

一、部署jdk环境

在所有机器上使用脚本安装jdk环境

#!/bin/bash
JAVA=`find  / -name  *jdk*tar*  -exec dirname {} \;`
#找到jdk的压缩包所在目录,并将设置为变量JAVA
JDK=`find  /  -name  *jdk*tar* 2>>/dev/null |awk  -F/ '{print $NF}'`
#找打jdk压缩包的名字,设置为变量JDK
cd ${JAVA}
tar xf ${JDK} -C /usr/local/
#切换到压缩包所在目录,并指定解压到/usr/local/目录下
JDKAPP=`find  /usr/local/   -maxdepth 1  -type  d  |grep  jdk|awk  -F/  '{print  $NF}'`
#找到解压后的目录名,并设置为变量名JDKAPP
ln  -s  /usr/local/${JDKAPP}  /usr/local/jdk
#做软链接,便于shell环境识别命令
cat > /etc/profile.d/jdk.sh  <<EOF
export JAVA_HOME=/usr/local/jdk
export PATH=\$JAVA_HOME/bin:\$PATH
export JRE_HOME=\$JAVA_HOME/jre 
export CLASSPATH=\$JAVA_HOME/lib/:\$JRE_HOME/lib/
EOF
#修改环境变量
echo "请执行  source /etc/profile.d/jdk.sh 命令,刷新文件

小结

'-----------------------------脚本安装JDK-----------------------------'
[root@node1 opt]#cat  jdk.sh
#!/bin/bash
JAVA=`find  / -name  *jdk*tar*  -exec dirname {} \;`
JDK=`find  /  -name  *jdk*tar* 2>>/dev/null |awk  -F/ '{print $NF}'`cd ${JAVA}
tar xf ${JDK} -C /usr/local/
JDKAPP=`find  /usr/local/   -maxdepth 1  -type  d  |grep  jdk|awk  -F/  '{print  $NF}'`
ln  -s  /usr/local/${JDKAPP}  /usr/local/jdk
cat > /etc/profile.d/jdk.sh  <<EOF
export JAVA_HOME=/usr/local/jdk
export PATH=\$JAVA_HOME/bin:\$PATH
export JRE_HOME=\$JAVA_HOME/jre 
export CLASSPATH=\$JAVA_HOME/lib/:\$JRE_HOME/lib/
EOF
echo "请执行  source /etc/profile.d/jdk.sh   命令,刷新文件"
[root@node1 opt]#bash jdk.sh
[root@node1 opt]#source /etc/profile.d/jdk.sh

二、搭建Elasticsearch

环境准备

IP地址主机名安装服务
192.168.83.30node1

JDK-1.18.0

Elasticsearch-6.6.1

192.168.83.40node2

JDK-1.18.0

Elasticsearch-6.6.1

修改配置文件


 

cluster.name: elk-cluster
#设置Elasticsearch集群的名称为 "elk-cluster"。
#这意味着所有带有相同cluster.name配置的Elasticsearch节点将会尝试加入同一个集群。node.name: node1
#指定当前节点的名字为 "node1",进行区分,便于集群内部管理和监控集群中的各个节点。
'另一台服务器一般指定为node2。也可以设置IP地址'path.data: /data/elk_data
#设置Elasticsearch数据存储路径,这是Elasticsearch存放索引数据的地方。
'需要自己手动创建,并修改属主与属组'path.logs: /var/log/elasticsearch
#设置Elasticsearch日志文件的存放路径,Elasticsearch的所有日志都会写入到这个目录下的文件bootstrap.memory_lock: false
#设置为 "false" 表示不锁定内存。
#若设为 true,Elasticsearch将尝试锁定全部分配给它的内存,防止在操作系统层面被交换到磁盘。
#在生产环境中,通常建议开启内存锁定以获得更好的性能和稳定性,但需要有足够的权限,并确保物理内存充足。network.host: 0.0.0.0
#设置为 "0.0.0.0",表示Elasticsearch节点监听所有可用网络接口上的连接请求,对外提供服务。http.port: 9200
#设置HTTP协议的服务端口为 "9200"discovery.zen.ping.unicast.hosts: ["node1", "node2"]
#设置Elasticsearch集群发现机制的初始节点列表,这里包含了 "node1" 和 "node2" 两个节点地址。
#集群中的节点通过互相ping这些地址来发现彼此并组建集群。
#在启动过程中,节点会尝试联系这个列表中的其他节点来参与集群。

启动服务

登录web界面查看信息

小结

'-----------------------------安装elasticsearch-----------------------------'
[root@node1 opt]#ls
elasticsearch-6.6.1.rpm  jdk-8u291-linux-x64.tar.gz  jdk.sh 
[root@node1 opt]#rpm -ivh elasticsearch-6.6.1.rpm 
[root@node1 opt]#systemctl daemon-reload
[root@node1 opt]#systemctl enable elasticsearch.service
'-----------------------------修改配置文件-----------------------------'
[root@node1 opt]#cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bak
[root@node1 opt]#vim /etc/elasticsearch/elasticsearch.yml
17	cluster.name: my-esh
23	node.name: node1
33	path.data: /data/elk_data
37	path.logs: /var/log/elasticsearch
43	bootstrap.memory_lock: false
55	network.host: 0.0.0.0
59	http.port: 9200
68	discovery.zen.ping.unicast.hosts: ["node1", "node2"]
[root@node1 opt]#cat  /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.83.30 node1
192.168.83.40 node2
[root@node1 opt]#mkdir  -p /data/elk_data
[root@node1 opt]#chown elasticsearch.elasticsearch /data/elk_data

三、搭建logstash

IP地址主机名安装服务
192.168.83.50logstash

JDK-1.18.0

logstash-6.6.1

httpd

测试服务情况

收到返回信息后在elasticsearch服务器上查看

小结

'-----------------------------安装logstash服务-----------------------------'
[root@logstash opt]#rpm -ivh logstash-6.6.1.rpm 
[root@logstash opt]#systemctl enable --now logstash.service
[root@logstash opt]#ln -s /usr/share/logstash/bin/logstash  /usr/local/bin'-----------------------------测试logstash服务-----------------------------'
[root@logstash opt]#logstash -e 'input { stdin{} } output { elasticsearch { hosts=>["192.168.83.30:9200"] } }'
www.baidu.com
...........

四、搭建kibana服务

环境准备

IP地址主机名安装服务
192.168.83.60kibana

JDK-1.18.0

kibana-6.6.1

使用rpm包直接安装

修改配置文件

server.port: 5601
#此行配置了Kibana服务监听的端口号为5601server.host: "0.0.0.0"
#该配置指示Kibana服务器绑定到所有可用的网络接口elasticsearch.hosts: ["http://192.168.83.30:9200"]
#配置了Kibana连接的Elasticsearch集群地址和端口kibana.index: ".kibana"
#Kibana使用此配置来指定在Elasticsearch中存储其自身配置和状态的索引名称

验证kibana服务

在logstash服务器上开启httpd服务:systemctl start httpd

创建收集httpd服务日志信息的配置文件

#input字段#Logstash使用两个file输入插件,分别配置了两个日志文件路径
'第一个file插件配置了Apache服务器的访问日志路径/etc/httpd/logs/access_log,
并将日志类型标记为"type"字段的"access"。'
'第二个file插件配置了Apache服务器的错误日志路径/etc/httpd/logs/error_log,并将日志类型标记为"type"字段的"error"'
'start_position => "beginning" 表示从日志文件的开始位置读取数据'
#这对于初次启动或重新配置Logstash时很有用,可以确保从头开始处理日志。#output字段'根据[type]字段的值,Logstash通过elasticsearch输出插件将处理后的数据发送到Elasticsearch集群''当[type]字段等于"access"时,数据将被发送到Elasticsearch,并按日期格式创建索引,
例如apache_access-2024.04.14,每天都会创建一个新的索引。''当[type]字段等于"error"时,数据同样被发送到相同的Elasticsearch集群
但是存储在按日期格式命名的另一个索引中,例如apache_error-2024.04.14'

创建索引访问

小结

'-----------------------------安装kibana服务-----------------------------'
[root@kibana opt]#rpm  -ivh kibana-6.6.1-x86_64.rpm
警告:kibana-6.6.1-x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID d88e42b4: NOKEY
准备中...                          ################################# [100%]
正在升级/安装...1:kibana-6.6.1-1                   ################################# [100%]
[root@kibana opt]#vim /etc/kibana/kibana.yml 
第2行	server.port: 5601
第7行	server.host: "0.0.0.0"
第28行	elasticsearch.hosts: ["http://192.168.83.30:9200"]
第37行	kibana.index: ".kibana"
[root@kibana opt]#systemctl enable --now kibana.service
[root@kibana opt]#netstat -natp | grep 5601
tcp        0      0 0.0.0.0:5601            0.0.0.0:*               LISTEN      60998/node  #浏览192.168.83.50:5601进行访问测试

五、搭建filebeat服务

IP地址主机名安装服务
192.168.83.70filebeat

JDK-1.18.0

filebeat-6.6.1

修改配置文件

    21	- type: log
#定义了一个名为"log"类型的输入处理器,这是一种用于从日志文件中收集数据的模块24	  enabled: true
#启用该模块27	  paths:28	    - /var/log/*.log29	    - /var/log/messages
#指定了需要监控的日志文件路径46	  fields:47	    server_name: fb48	    log_type: log49	    server_id: 192.168.83.70
#这些字段会被附加到从日志文件中提取的每条事件上。152	#output.elasticsearch:153	  # Array of hosts to connect to.154	 # hosts: ["localhost:9200"]
#将输出到elasticsearch模块的信息注释掉165	output.logstash:167	  hosts: ["192.168.83.50:5044"]
#将信息输出到logstash服务器上,并指定服务器地址与端口号

在logstash服务器上添加接收filebeat服务器传输的信息的配置文件

input {beats {port => "5044"}
}
#定义logstash的5044端口,接收来自beats工具的所有数据
output {elasticsearch {hosts => ["192.168.83.30:9200"]index => "%{[fields][server_name]}-%{+YYYY.MM.dd}"}
}
#将接收的数据传输到elasticsearch服务器上
#并将索引名称定义为filebeat配置文件中,事件字段fields中server_name的值,并以时间结尾

在另一个终端上查看端口号是否开启

确认logstash服务器的5044端口还在监听状态后,在filebeat服务器上启动filebeat服务并加载配置文件

filebeat -e -c /etc/filebeat/filebeat.yml

登录kibana服务web界面

小结

'-----------------------------安装启动filebeat服务-----------------------------'
[root@filebeat opt]#rpm  -ivh filebeat-6.6.1-x86_64.rpm 
[root@filebeat opt]#cp  /etc/filebeat/filebeat.yml  /etc/filebeat/filebeat.yml_bak
[root@filebeat opt]#vim  /etc/filebeat/filebeat.yml21	- type: log24	  enabled: true27	  paths:28	    - /var/log/*.log29	    - /var/log/messages46	  fields:47	    server_name: fb48	    log_type: log49	    server_id: 192.168.83.70152	#output.elasticsearch:153	  # Array of hosts to connect to.154	 # hosts: ["localhost:9200"]165	output.logstash:167	  hosts: ["192.168.83.50:5044"]
[root@filebeat opt]#filebeat -e -c /etc/filebeat/filebeat.yml
.......'-----------------------------添加logstash服务配置文件-----------------------------'
[root@logstash ~]#vim  /etc/logstash/conf.d/fb.conf
input {beats {port => "5044"}
}
output {elasticsearch {hosts => ["192.168.83.30:9200"]index => "%{[fields][server_name]}-%{+YYYY.MM.dd}"}
}[root@logstash ~]#logstash -f /etc/logstash/conf.d/fb.conf
............

六、搭建zookeeper与kafka服务

IP地址主机名安装服务
192.168.83.80zk-ka1

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

192.168.83.90zk-ka2

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

192.168.83.100zk-ka3

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

在三台机器上安装zookeeper服务与Kafka服务

修改zookeeper服务配置文件

三台服务器配置文件相同

tickTime=2000
#基本时间单元,所有超时和心跳时间间隔都以tickTime的倍数来表示。这里设置为2000毫秒,即2秒initLimit=10
#初始化连接时的最大时间限制,单位为tickTime。
#当follower启动并试图连接leader时,follower在10*2秒(即20秒)内必须完成与leader初始同步syncLimit=5
#leader与follower之间发送消息的同步确认的最大时间限制,同样单位为tickTime。
#这意味着follower必须在5*2秒(即10秒)内响应leader的心跳或同步请求。dataDir=/usr/local/zookeeper-3.5.7/data
#ZooKeeper存放持久化数据的目录,如事务日志、快照等。需要手动创建dataLogDir=/usr/local/zookeeper-3.5.7/logs
#ZooKeeper专门存放事务日志的目录,分离数据和日志存储可以优化磁盘I/O性能。需要手动创建clientPort=2181
#ZooKeeper服务监听客户端连接的端口号,客户端通过这个端口与ZooKeeper集群进行通信。server.1=192.168.83.70:3188:3288
server.2=192.168.83.80:3188:3288server.3=192.168.83.90:3188:3288
#这三行配置描述了ZooKeeper集群中的三个服务器节点。格式为server.id=hostname:port1:port2
#其中:
#id(1、2、3)是集群中服务器的唯一标识。
#hostname(192.168.83.70、192.168.83.80、192.168.83.90)是服务器的IP地址。
#port1(3188)是集群内部通信的端口,用于follower和observer与其他服务器通信。
#port2(3288)是选举leader时服务器之间通信的端口。

修改kafka服务配置文件

三台服务器的kafka配置文件中只有broker.id、listeners字段值不同

broker.id=0
#定义当前Kafka Broker的唯一标识ID。在集群中,每个Broker的ID都应是唯一的。
#其它两台服务器设置为1,2listeners=PLAINTEXT://192.168.83.70:9092
#指定Broker监听客户端连接的地址和端口,这里是使用PLAINTEXT协议
#三台服务器分别监听本机的IP地址num.network.threads=3
#设置网络IO线程数量,用于处理网络请求,比如接收生产者的消息和响应消费者的请求。num.io.threads=8
#设置磁盘IO线程数量,用于处理磁盘读写操作,如写入日志文件和从磁盘读取消息。socket.send.buffer.bytes=102400 和 socket.receive.buffer.bytes=102400
#分别设置Socket发送和接收缓冲区的大小(单位为字节),影响TCP层的数据传输效率。socket.request.max.bytes=104857600
#设置单个请求允许的最大字节数,超过这个大小的请求会被拒绝。log.dirs=/usr/local/kafka/logs
#设置Kafka日志数据的存储目录,即Kafka消息持久化的路径。num.partitions=1
#默认每个主题的分区数量,这里设定为每个主题初始化时只有一个分区。num.recovery.threads.per.data.dir=1
#指定每个数据目录(日志目录)下用于恢复数据的线程数量。offsets.topic.replication.factor=1 和 transaction.state.log.replication.factor=1
#这两个配置分别指定了Kafka内部主题(__consumer_offsets)和事务状态日志的复制因子,
#这里均为1,表示没有数据复制,所有数据仅在一个Broker上存储。transaction.state.log.min.isr=1
#设置事务状态日志的最小ISR大小,这里为1,意味着只要有一个副本是同步的,就满足要求。log.retention.hours=168
#设置日志保留时间,这里是168小时(7天),超过这个时间的数据将被删除。log.segment.bytes=1073741824
#每个日志分段的大小,当达到这个阈值时,Kafka会创建新的日志分段。log.retention.check.interval.ms=300000
#日志清理检查间隔,每隔300000毫秒(5分钟)检查一次日志是否需要删除。zookeeper.connect=192.168.83.70:2181,192.168.83.80:2181,192.168.83.90:2181
#设置连接ZooKeeper集群的地址和端口,此处配置了一个由三个节点构成的ZooKeeper集群。zookeeper.connection.timeout.ms=18000
#ZooKeeper连接超时时间,设置为18000毫秒(18秒)。group.initial.rebalance.delay.ms=0
#消费者组在启动时的初始再平衡延迟时间,设置为0表示消费者在加入组时立即开始再平衡。

创建zookeeper启动脚本,三台服务器脚本文件相同

#!/bin/bash
声明该脚本使用bash shell进行解释执行。#chkconfig:2345 20 90
这行注释是针对Red Hat家族(如CentOS、Fedora等)Linux发行版的chkconfig工具的指令,
#用于在不同运行级别(2、3、4、5)下设置服务的启动优先级(20,较高)和停止优先级(90,较低)。#description:Zookeeper Service Control Script
描述脚本的功能,即ZooKeeper服务的控制脚本。ZK_HOME='/usr/local/zookeeper-3.5.7'
#定义ZooKeeper的安装目录,便于在脚本中引用。脚本主体部分使用case $1 in结构,根据传入的第一个参数($1)执行相应的操作:start:启动ZooKeeper服务,执行$ZK_HOME/bin/zkServer.sh start命令。
stop:停止ZooKeeper服务,执行$ZK_HOME/bin/zkServer.sh stop命令。
restart:重启ZooKeeper服务,执行$ZK_HOME/bin/zkServer.sh restart命令。
status:查看ZooKeeper服务的状态,执行$ZK_HOME/bin/zkServer.sh status命令。
*:如果传入的参数不在上述情况中,脚本将打印正确的使用方式。chmod +x /etc/init.d/zookeeper   #添加执行权限
chkconfig --add zookeeper        #设置开机自动进行管理

启动服务

创建kafka启动脚本,三台服务器脚本文件相同,并依次启动

#!/bin/bash:定义脚本使用的解释器为Bash Shell。#chkconfig:2345 22 88
#这是针对Red Hat系列Linux(如CentOS、Fedora等)的 chkconfig 工具的注释行,
#用来设置服务在运行级别2、3、4、5下的启动优先级(22)和关闭优先级(88)。#description:Kafka Service Control Script
#描述脚本功能,即用于控制Apache Kafka服务的启动、停止、重启和查看状态。KAFKA_HOME='/usr/local/kafka'
#定义Kafka的安装目录。case $1 in
#根据用户传入的第一个参数($1)执行不同的操作。start
#启动Kafka服务,
#通过${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties命令执行。
#-daemon选项表示以守护进程方式运行Kafka服务器,server.properties是Kafka的配置文件。stop
#停止Kafka服务,执行${KAFKA_HOME}/bin/kafka-server-stop.sh命令。restart
#先调用 $0 stop 停止服务,然后调用$0start重新启动服务。status
#检查Kafka服务是否正在运行。
#通过ps -ef | grep kafka | egrep -cv "grep|$$"命令统计Kafka进程的数量,
#如果不为0,则说明Kafka正在运行;否则,Kafka未运行。*
#如果传入的参数不在上述情况中,脚本将打印正确的使用方式

小结

'-----------------------------安装zookeeper服务--------------------------------'
[root@kibana opt]#tar xf apache-zookeeper-3.5.7-bin.tar.gz
[root@kibana opt]#mv  apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
[root@kibana opt]#cd  /usr/local/zookeeper-3.5.7/conf/
[root@kibana opt]#cp  zoo_sample.cfg zoo.cfg
[root@kibana opt]#vim  zoo.cfg
2	tickTime=2000
5	initLimit=10
8	syncLimit=5
12	dataDir=/usr/local/zookeeper-3.5.7/data
13	dataLogDir=/usr/local/zookeeper-3.5.7/logs
15	clientPort=2181
30	server.1=192.168.83.80:3188:3288
31	server.2=192.168.83.90:3188:3288
32	server.3=192.168.83.100:3188:3288
[root@kibana opt]#mkdif -p /usr/local/zookeeper-3.5.7/data
[root@kibana opt]#mkdif -p /usr/local/zookeeper-3.5.7/logs'---------------------------创建zookeeper服务启动脚本------------------------------'
[root@kibana opt]#vim  /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)echo "---------- zookeeper 启动 ------------"$ZK_HOME/bin/zkServer.sh start
;;
stop)echo "---------- zookeeper 停止 ------------"$ZK_HOME/bin/zkServer.sh stop
;;
restart)echo "---------- zookeeper 重启 ------------"$ZK_HOME/bin/zkServer.sh restart
;;
status)echo "---------- zookeeper 状态 ------------"$ZK_HOME/bin/zkServer.sh status
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac
[root@kibana opt]#chmod +x /etc/init.d/zookeeper
[root@kibana opt]#chkconfig --add zookeeper
[root@kibana opt]#service zookeeper start'-----------------------------安装zookeeper服务--------------------------------'
[root@kibana opt]#tar xf kafka_2.13-2.7.1.tgz 
[root@kibana opt]#mv kafka_2.13-2.7.1  /usr/local/kafka
[root@kibana opt]#cd  /usr/local/kafka/config/
[root@kibana opt]#cp server.properties server.properties_bak
[root@kibana opt]#vim  server.properties
21	broker.id=0                                #每台服务器的唯一标识不一样
31	listeners=PLAINTEXT://192.168.83.80:9092   #监听本机的IP地址的9092的端口号
42	num.network.threads=3
45	num.io.threads=8
48	socket.send.buffer.bytes=102400
51	socket.receive.buffer.bytes=102400
54	socket.request.max.bytes=104857600
60	log.dirs=/usr/local/kafka/logs
65	num.partitions=1
69	num.recovery.threads.per.data.dir=1
74	offsets.topic.replication.factor=1
75	transaction.state.log.replication.factor=1
76	transaction.state.log.min.isr=1
103	log.retention.hours=168
110	log.segment.bytes=1073741824
114	log.retention.check.interval.ms=300000
123	zookeeper.connect=192.168.83.80:2181,192.168.83.90:2181,192.168.83.100:2181
126	zookeeper.connection.timeout.ms=18000
136	group.initial.rebalance.delay.ms=0'---------------------------创建kafka服务启动脚本------------------------------'
[root@kibana opt]#vim  /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac
[root@kibana opt]#chmod +x /etc/init.d/kafka
[root@kibana opt]#chkconfig --add kafka
[root@kibana opt]#service kafka start

七、部署ELFK+zookeeper+kafka

(一)修改filebeat服务器配置文件

filebeat.inputs
#定义了Filebeat应该监控哪些日志文件,并接收日志信息。- type: log
#定义输入类型为日志文件。enabled: true
#启用此输入源paths:
#指定日志文件的路径列表,指定/var/log/nginx/access_log,表示监控nginx服务器的访问日志tags: ["access"]
#给收集到的数据打上标签,这里打上了"access"标签,可以在后续处理和分析时用于筛选或分类'第二段信息含义与第一段一致'

输入:filebeat  -e -c /etc/filebeat/filebeat.yml   加载配置文件

(二)在logstash服务器上添加配置文件

input 字段:kafka {
#表示从Kafka集群中读取数据:
bootstrap_servers => "192.168.10.17:9092,192.168.10.21:9092,192.168.10.22:9092"
#指定了Kafka集群中Broker服务器的地址列表,Logstash将尝试连接这些地址以获取消息。
topics => "nginx"
#指定了要订阅的主题名称,这里是"nginx"。
type
#设置了输入数据的类型为"nginx_kafka"
codec => "json"
#配置了数据编码解码器为"json",这意味着Logstash将自动解析从Kafka接收到的消息内容为JSON格式。
auto_offset_reset => "earliest"
#设置为"earliest",从最初的消息开始读取。
decorate_events => true
#表示Logstash将在传递给Elasticsearch的数据中添加Kafka相关的元数据信息。output字段:根据数据中的tags字段进行逻辑判断,分别将数据输出到不同的Elasticsearch索引:#若tags字段包含"access",则将数据写入名为"nginx_access-%{+YYYY.MM.dd}"的索引
#若tags字段包含"error",则将数据写入名为"nginx_error-%{+YYYY.MM.dd}"的索引

在elasticsearch服务器上查看所有索引

在kibana服务器上建立索引并查看信息

'-----------------------------修改filebeat服务配置文件--------------------------------'
[root@filebeat opt]#vim  /etc/filebeat/filebeat.yml15	filebeat.inputs:16	- type: log17	  enabled: true18	  paths:19	    - /var/log/nginx/access.log20	  tags: ["access"]21	- type: log22	  enabled: true23	  paths:24	    - /var/log/nginx/error.log25	  tags: ["error"]160	#----------------------------- Kafka output --------------------------------161	output.kafka:162	  enabled: true163	  hosts: ["192.168.83.80:9092","192.168.83.90:9092","192.168.83.100:9092"]164	  topic: "nginx"165	#----------------------------- Logstash output --------------------------------166	#output.logstash:167	  # The Logstash hosts168	 # hosts: ["192.168.83.50:5044"]
[root@filebeat opt]#filebeat -e -c /etc/filebeat/filebeat.yml'-----------------------------添加logstash服务配置文件--------------------------------'
[root@logstash ~]#vim /etc/logstash/conf.d/kafka.conf
input {kafka {bootstrap_servers => "192.168.83.80:9092,192.168.83.90:9092,192.168.83.100:9092"topics  => "nginx"type => "nginx_kafka"codec => "json"auto_offset_reset => "earliest"decorate_events => true}
}output {if "access" in [tags] {elasticsearch {hosts => ["192.168.83.30:9200"]index => "nginx_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.83.30:9200"]index => "nginx_error-%{+YYYY.MM.dd}"}}
}
[root@logstash ~]#logstash -f  /etc/logstash/conf.d/kafka.conf#登录kibana服务web界面创建索引进行查看

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/815187.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面试八股——Spring——AOP与事务

AOP的定义 事务的实现 事务的失效场景 异常捕获处理 下图中由于②导致异常&#xff1a; 原因&#xff1a; 解决办法&#xff1a;自己抛出一个非检查异常&#xff08;具体原因看“抛出检查异常”&#xff09;。 抛出检查异常 由于①出错&#xff0c;导致抛出了检查异常 原因&…

[通俗易懂:Linux标准输入/输出和重定向]Shell脚本之 > /dev/null 2>1命令详解

目录标题 一、> /dev/null 2>&1 命令解析二、/dev/null 文件浅显理解三、标准输入、标准输出、标准错误输出四、输入重定向、输出重定向五、命令作用与应用场景 如果想看命令意义&#xff0c;可以直接跳到第五部分 一、> /dev/null 2>&1 命令解析 我们在别…

计算机服务器中了360后缀勒索病毒怎么办?360后缀勒索病毒解密步骤

网络技术的不断应用与发展&#xff0c;为企业的生产运营提供了极大便利&#xff0c;利用网络可以开展各项工作业务&#xff0c;可以大大提高企业的生产效率&#xff0c;然而&#xff0c;网络是一把双刃剑&#xff0c;在为企业提供便利的同时&#xff0c;也为企业的数据安全带来…

常见的Linux命令

linux操作系统 ctrl鼠标滚动 放大缩小字体 cd /目录进入目录下 ls查看当前目录下的所有内容 tar -zxvf 压缩包名字 对压缩包进行解压 sync将数据由内存同步到硬盘上 shutdown关机指令 shutdown -h 10 /10 表示十分钟后关机 shutdown -h now 表示现在关机 shutdown -h…

英语新概念2-回译法-lesson6

我刚刚搬家去柏林大街的房子里。昨天一个流浪汉敲我的门,他想我寻求一顿饭和一杯啤酒。未拒绝了这个请求之后,这个流浪汉倒立着唱歌,我给他了一顿饭,他吃了食物并且喝了啤酒,然后他把一片奶酪放到他的口袋里然后走开了。过了一会儿,一个领居告诉我关于这个流浪汉的事情。…

Centos7 K8S 集群 - kubeadm搭建方式

机器准备 搭建环境是centos7, 四核心4G内存四台机器 一个master节点&#xff0c;一个etcd&#xff0c;两台node 机器名称IP 地址master192.168.1.127node1192.168.1.129node2192.168.1.130node3192.168.1.131 机器时间同步 各节点时间要求精确同步&#xff0c;可以直接联网…

算法设计与分析(超详解!) 第二节 递归与分治

1.递归定义 直接或间接地调用自身的算法称为递归算法。用函数自身给出定义的函数称为递归函数。 由分治法产生的子问题往往是原问题的较小模式&#xff0c;这就为使用递归技术提供了方便。在这种情况下&#xff0c;反复应用分治手段&#xff0c;可以使子问题与原问题类型一致…

LeetCode-热题100:226. 翻转二叉树

题目描述 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 示例 1&#xff1a; 输入&#xff1a; root [4,2,7,1,3,6,9] 输出&#xff1a; [4,7,2,9,6,3,1] 示例 2&#xff1a; 输入&#xff1a; root [2,1,3] 输出&#xff1a; […

GlusterFS 分布式文件系统 搭建及使用

一、GlusterFS GlusterFS 是一个开源的分布式文件系统&#xff0c;旨在提供高性能、可扩展性和可靠性&#xff0c;适用于现代数据中心和云环境。它以横向扩展的方式设计&#xff0c;可以在多台服务器之间共享文件系统&#xff0c;为应用程序提供统一的文件存储服务。 Gluster…

【C 数据结构】线性表

文章目录 【 1. 线性表 】【 2. 顺序存储结构、链式存储结构 】【 3. 前驱、后继 】 【 1. 线性表 】 线性表&#xff0c;全名为线性存储结构&#xff0c;线性表结构存储的数据往往是可以依次排列的&#xff08;不考虑数值大小顺序&#xff09;。 例如&#xff0c;存储类似 {1…

蓝桥杯-数组分割

问题描述 小蓝有一个长度为 N 的数组 A 「Ao,A1,…,A~-1]。现在小蓝想要从 A 对应的数组下标所构成的集合I 0,1,2,… N-1 中找出一个子集 民1&#xff0c;那么 民」在I中的补集为Rz。记S∑reR 4&#xff0c;S2∑rERA,&#xff0c;我们要求S、和 S,均为偶数&#xff0c;请问在这…

c语言-----数组知识汇总

前言 本文为我学习数组知识点之后&#xff0c;对c语言的数组部分进行的知识点汇总。 简单数组介绍 简单来说&#xff0c;数组就是一个数据组&#xff0c;像一个箱子&#xff0c;里面放有多个数据。 [1,2,3,4,5] 数组的定义 基础定义 语法&#xff1a; 数据类型 数组名[数组…

代码随想录-算法训练营day12【休息,复习与总结】

代码随想录-035期-算法训练营【博客笔记汇总表】-CSDN博客 ● day 12 周日休息&#xff08;4.14&#xff09; 目录 复习与总结 0417_图论-太平洋大西洋水流问题 0827_图论-最大人工岛 复习与总结 二刷做题速度提升了一大截&#xff0c;ヾ(◍∇◍)&#xff89;&#xff9e;加…

基于SpringBoot实现的在线拍卖系统

系统开发环境 编程语言&#xff1a;Java数据库&#xff1a;MySQL容器&#xff1a;Tomcat工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统实现 管理员功能模块 首页 修改密码 用户管理 商品类型管理 拍卖商品 竞拍公告 轮播图 历史竞拍管理 竞拍订单管理 留言板管理 用户…

多输入多输出 | Matlab实现XGboost多输入多输出预测

多输入多输出 | Matlab实现XGboost多输入多输出预测 目录 多输入多输出 | Matlab实现XGboost多输入多输出预测预测效果基本介绍程序设计往期精彩参考资料 预测效果 基本介绍 Matlab实现XGboost多输入多输出预测 1.data为数据集&#xff0c;10个输入特征&#xff0c;3个输出变量…

使用 vue3-sfc-loader 加载远程Vue文件, 在运行时动态加载 .vue 文件。无需 Node.js 环境,无需 (webpack) 构建步骤

加载远程Vue文件 vue3-sfc-loader vue3-sfc-loader &#xff0c;它是Vue3/Vue2 单文件组件加载器。 在运行时从 html/js 动态加载 .vue 文件。无需 Node.js 环境&#xff0c;无需 (webpack) 构建步骤。 主要特征 支持 Vue 3 和 Vue 2&#xff08;参见dist/&#xff09;仅需…

UDP实现Mini版在线聊天室

实现原理 只有当客户端先对服务器发送online消息的时候&#xff0c;服务器才会把客户端加入到在线列表。当在线列表的用户发消息的时候&#xff0c;服务器会把消息广播给在线列表中的所有用户。而当用户输入offline时&#xff0c;表明自己要下线了&#xff0c;此时服务器把该用…

服务器docker应用一览

文章目录 一、需求概况二、业务流程三、运行效果四、实现过程1. 基础前提2. 源码放送3.核心代码4. 项目打包5.部署步骤 一、需求概况 现有某云主机服务器&#xff0c;用来做项目演示用&#xff0c;上面运行了docker应用&#xff0c;现希望有一总览页面&#xff0c;用来展示部署…

HC-SR04(超声波模块)

工具 1.Proteus 8 仿真器 2.keil 5 编辑器 原理图 讲解 简介 HC-SR04超声波模块是一种常用的测距模块&#xff0c;通过不断检测超声波发射后遇到障碍物所反射的回波&#xff0c;从而测出发射和接收回波的时间差&#xff0c;并据此求出距离。它主要由两个压电陶瓷超声传感器…

Centos7查看内存使用情况

Centos7查看内存使用情况 free -b&#xff1a;以字节为单位显示内存使用情况。-k&#xff1a;以KB为单位显示内存使用情况&#xff08;默认选项&#xff09;。-m&#xff1a;以MB为单位显示内存使用情况。-g&#xff1a;以GB为单位显示内存使用情况。-t&#xff1a;在输出的最…