Elk+Filebeat+Kafka实现日志收集(本机nginx)
部署Zookeeper
1.实验组件
#准备3台服务器做Zookeeper集群
20.0.0.10
20.0.0.20
20.0.0.30
2.安装前准备
#关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
setenforce 0#安装JDK
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version#将apache-zookeeper-3.5.7-bin.tar.gz压缩包上传至/opt目录
3.安装Zookeeper
#三台服务器一齐操作
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /opt/zookeeper#修改配置文件
cd /opt/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
--2--
tickTime=2000
#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒--5--
initLimit=10
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s--8--
syncLimit=5
#Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer--12--修改
dataDir=/opt/zookeeper/data
#指定保存Zookeeper中的数据的目录,目录需要单独创建--添加--
dataLogDir=/opt/zookeeper/logs
#指定存放日志的目录,目录需要单独创建--15--
clientPort=2181
#客户端连接端口--末尾添加集群信息--
server.1=20.0.0.10:3188:3288
server.2=20.0.0.20:3188:3288
server.3=20.0.0.30:3188:3288
#在每个节点上创建数据目录和日志目录
mkdir /opt/zookeeper/data
mkdir /opt/zookeeper/logs#在每个节点的dataDir指定的目录下创建一个 myid 的文件,不同节点分配1、2、3
echo 1 > /opt/zookeeper/data/myid
echo 2 > /opt/zookeeper/data/myid
echo 3 > /opt/zookeeper/data/myid#配置 Zookeeper 启动脚本
vim /etc/init.d/zookeeper#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/opt/zookeeper'
case $1 in
start)echo "---------- zookeeper 启动 ------------"$ZK_HOME/bin/zkServer.sh start
;;
stop)echo "---------- zookeeper 停止 ------------"$ZK_HOME/bin/zkServer.sh stop
;;
restart)echo "---------- zookeeper 重启 ------------"$ZK_HOME/bin/zkServer.sh restart
;;
status)echo "---------- zookeeper 状态 ------------"$ZK_HOME/bin/zkServer.sh status
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac#设置开机自启
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper#分别启动 Zookeeper
service zookeeper start#查看当前状态
service zookeeper status
部署Kafka(3.4.1版本)
1.安装Kafka
cd /opt
--上传kafka_2.13-3.4.1.tgz--
tar -xf kafka_2.13-3.4.1.tgz
mv kafka_2.13-3.4.1 kafka
cd kafka/config/
cp server.properties server.properties.bak
vim server.properties
--24--
broker.id=1
#broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=2、broker.id=3--34--
listeners=PLAINTEXT://20.0.0.10:9092
#每台服务器分别为10、20、30,不用加地址映射--62--
log.dirs=/var/log/kafka
#kafka运行日志存放的路径,也是数据存放的路径--125--
zookeeper.connect=20.0.0.10:2181,20.0.0.20:2181,20.0.0.30:2181
#配置连接Zookeeper集群地址#修改全局配置
vim /etc/profile
--添加--
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/binsource /etc/profile
#配置Zookeeper启动脚本
vim /etc/init.d/kafka#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/opt/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac#设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka#分别启动Kafka
service kafka start
2.命令行测试
#创建topic
kafka-topics.sh --create --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --replication-factor 2 --partitions 3 --topic test1#查看当前服务器中的所有 topic
kafka-topics.sh --list --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092#发布消息
kafka-console-producer.sh --broker-list 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1#消费消息
kafka-console-consumer.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1 --from-beginning#修改分区数
kafka-topics.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --alter --topic test1 --partitions 6#删除 topic
kafka-topics.sh --delete --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1
部署Filebeat
1.安装Filebeat
#10
cd /opt/
--上传filebeat-6.7.2-linux-x86_64.tar.gz--
tar -xf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64 filebeat
vim /etc/logstash/logstash.yml
--64--
path.config: /opt/logsystemctl restart logstash
2.时间同步
#所有节点
yum -y install ntpdate
ntpdate ntp.aliyun.com
date
3.配置filebeat
#给nginx日志文件赋权
cd /var/log/nginx/
chmod 777 access.log error.log
#配置filebeat
cd /usr/local/filebeat
vim filebeat.yml
filebeat.inputs:- type: logenabled: true paths:- /var/log/nginx/access.log- /var/log/nginx/error.logtags: ["nginx"]fields:service_name: 20.0.0.10_nginxlog_type: nginxfrom: 20.0.0.10output.kafka:enabled: truehosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]topic: "nginx"--------------Elasticsearch output-------------------
(全部注释掉)----------------Logstash output---------------------
(全部注释掉)nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#启动filebeat
4.配置logstash
cd /opt/log/
vim kafka.confinput {kafka {bootstrap_servers => "20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092"topics => "nginx"type => "nginx_kafka"codec => "json"auto_offset_reset => "earliest"decorate_events => true}
}output {if "nginx" in [tags] {elasticsearch {hosts => ["20.0.0.20:9200","20.0.0.30:9200"]index => "nginx_access-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug }
}logstash -f kafka.conf --path.data /opt/test1
日志收集(远程Apache+Mysql)
部署Filebeat
1.安装配置filebeat
#收集81服务器上的mysql和apache日志
cd /opt/
--上传filebeat-6.7.2-linux-x86_64.tar.gz--
tar -xf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64 filebeat
cd filebeat/
vim filebeat.ymlfilebeat.inputs:- type: logenabled: truepaths:- /etc/httpd/logs/access_log- /etc/httpd/logs/error_logtags: ["httpd_81"]fields:service_name: 20.0.0.81_httpdlog_type: httpdfrom: 20.0.0.81- type: logenabled: truepaths:- /usr/local/mysql/data/mysql_general.logtags: ["mysql_81"]fields:service_name: 20.0.0.81_mysqllog_type: mysqlfrom: 20.0.0.81output.kafka:enabled: truehosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]topic: "httpdmysql"--------------Elasticsearch output-------------------
(全部注释掉)----------------Logstash output---------------------
(全部注释掉)nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#启动filebeat
2.配置logstash
10:
cd /opt/log/
vim 81_a+m.conf input {kafka {bootstrap_servers => "20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092"topics => "httpdmysql"type => "httpd+mysql_kafka"codec => "json"auto_offset_reset => "earliest"decorate_events => true}
}output {if "httpd_81" in [tags] {elasticsearch {hosts => ["20.0.0.20:9200","20.0.0.30:9200"]index => "nginx_access-%{+YYYY.MM.dd}"}}if "mysql_81" in [tags] {elasticsearch {hosts => ["20.0.0.20:9200","20.0.0.30:9200"]index => "nginx_access-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug }
}logstash -f 81_a+m.conf --path.data /opt/test2