基于Kafka的日志采集

目录

前言

架构图

资源列表

基础环境

关闭防护墙

关闭内核安全机制

修改主机名

添加hosts映射

一、部署elasticsearch

修改limit限制

部署elasticsearch

修改配置文件

启动

二、部署filebeat

部署filebeat

添加配置文件

启动

三、部署kibana

部署kibana

修改配置文件

启动

四、部署Kafka

安装java

安装kafka

配置环境变量

创建数据存储目录和日志存储目录

修改zk配置文件

修改Kafka配置文件

启动zk

启动Kafka

测试

五、部署logstash

部署logstash

添加配置文件

启动


前言

        当日志量变得非常大时,传统的日志收集平台可能会遇到性能瓶颈、单点故障或扩展性问题。在这种情况下,引入消息队列(如Kafka)可以显著增强日志收集系统的健壮性、可扩展性和实时性。

以下是当在日志收集平台中加入Kafka时,可以带来的优势和改进:

  1. 缓冲和异步处理
    Kafka作为一个消息队列,可以充当Filebeat(或其他日志收集器)和Logstash(或其他日志处理组件)之间的缓冲层。Filebeat可以将日志数据异步地发送到Kafka,而不需要等待Logstash的即时响应。这样,即使Logstash暂时无法处理数据,Kafka也可以暂时存储数据,直到Logstash恢复处理能力。

  2. 水平扩展
    随着日志量的增长,Kafka可以通过添加更多的节点(brokers)来实现水平扩展。这种扩展方式使得Kafka能够处理更多的并发写入和读取操作,而不会遇到单点故障或性能瓶颈。此外,Kafka的分布式架构还允许数据在多个节点之间进行复制,以提高数据的可靠性和容错性。

  3. 实时数据处理
    Kafka支持实时数据流处理,使得日志数据可以立即被消费和处理。这意味着一旦日志数据被写入Kafka,就可以立即被Logstash(或其他流处理工具)读取和处理,以满足实时分析、监控和告警的需求。

  4. 数据持久化
    Kafka将数据持久化到磁盘上,以确保即使在系统崩溃或重启的情况下,数据也不会丢失。这种持久化机制使得Kafka成为了一个可靠的数据传输和存储平台,特别适用于对日志数据进行长期存储和分析的场景。

  5. 多消费者支持
    Kafka允许多个消费者(如Logstash、其他数据分析工具或应用)从同一个主题(topic)中消费数据。这意味着您可以同时运行多个消费者来处理和分析日志数据,以满足不同的业务需求和数据使用场景。

  6. 可定制性和灵活性
    Kafka提供了丰富的API和工具,使得您可以轻松地定制和扩展日志收集系统。例如,您可以编写自定义的Kafka生产者来收集特定格式的日志数据,或者编写自定义的Kafka消费者来处理和分析日志数据。

  7. 与其他系统的集成
    Kafka是一个广泛使用的消息队列系统,它支持与其他各种系统和工具进行集成。这意味着您可以将Kafka轻松地集成到现有的日志收集、处理、存储和分析系统中,以构建一个更加健壮、可扩展和灵活的日志收集平台。

        综上所述,当日志量变得非常大时,在日志收集平台中加入Kafka可以显著提高系统的性能、可靠性和可扩展性。通过利用Kafka的缓冲、异步处理、水平扩展、实时数据处理、数据持久化、多消费者支持、可定制性和与其他系统的集成能力,您可以构建一个更加健壮、高效和灵活的日志收集系统。

        有需要本次实验软件包的评论区可以找我要,无偿提供。

架构图

资源列表

操作系统配置主机名IP
CentOS7.3.16112C4Ges01192.168.207.131
CentOS7.3.16112C4Gkibana192.168.207.165
CentOS7.3.16112C4Gfilebeat192.168.207.166
CentOS7.3.16112C4Gkafka192.168.207.167
CentOS7.3.16112C4Glogstash192.168.207.168

基础环境

关闭防护墙

systemctl stop firewalld
systemctl disable firewalld

关闭内核安全机制

sed -i "s/.*SELINUX=.*/SELINUX=disabled/g" /etc/selinux/config
reboot

修改主机名

hostnamectl set-hostname es01
hostnamectl set-hostname kibana
hostnamectl set-hostname filebeat
hostnamectl set-hostname kafka
hostnamectl set-hostname logstash

添加hosts映射

cat >> /etc/hosts << EOF
192.168.207.131 es01
192.168.207.165 kibana
192.168.207.166 filebeat
192.168.207.167 kafka
192.168.207.168 logstash
EOF

一、部署elasticsearch

修改limit限制

cat > /etc/security/limits.d/es.conf << EOF
* soft nproc 655360
* hard nproc 655360
* soft nofile 655360
* hard nofile 655360
EOF
​
cat >> /etc/sysctl.conf << EOF
vm.max_map_count=655360
EOF
sysctl -p

部署elasticsearch

mkdir -p /data/elasticsearch
tar zxvf elasticsearch-7.14.0-linux-x86_64.tar.gz -C /data/elasticsearch

修改配置文件

mkdir /data/elasticsearch/{data,logs}[root@es01 elasticsearch-7.14.0]# grep -v "^#" /data/elasticsearch/elasticsearch-7.14.0/config/elasticsearch.yml
cluster.name: my-application
node.name: es01
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
network.host: 0.0.0.0
http.port: 9200
cluster.initial_master_nodes: ["es01"]

启动

useradd es 
chown -R es:es /data/
su - es
/data/elasticsearch/elasticsearch-7.14.0/bin/elasticsearch -d

二、部署filebeat

部署filebeat

mkdir -p /data/filebeat
tar zxvf filebeat-7.14.0-linux-x86_64.tar.gz -C /data/filebeat/

添加配置文件

这里提供了两份filebeat配置文件的参考

[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/messages         ###要监控的日志文件
setup.template.settings:index.number_of_shards: 3
output.kafka:#version:0.10.2             ### 根据不同 CKafka 实例开源版本配置hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口topic: 'topic_test1'       ###topic实例名partition.round_robin:reachable_only: falserequired_acks: 1compression: nonemax_message_bytes: 10000000[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/httpd/access_log         ###要监控的日志文件fields:kafka_topic: httpd_access
- type: logenabled: truepaths:- /var/log/httpd/error_log         ###要监控的日志文件fields:kafka_topic: httpd_error
setup.template.settings:index.number_of_shards: 3
output.kafka:#version:0.10.2             ### 根据不同 CKafka 实例开源版本配置hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口topic: '%{[fields.kafka_topic]}'       ###topic实例名partition.round_robin:reachable_only: falserequired_acks: 1compression: nonemax_message_bytes: 10000000

启动

/data/filebeat/filebeat-7.14.0-linux-x86_64/filebeat -e -c filebeat.yml

三、部署kibana

部署kibana

mkdir -p /data/kibana
tar zxvf kibana-7.14.0-linux-x86_64.tar.gz -C /data/kibana/

修改配置文件

grep -v "^#" /data/kibana/kibana-7.14.0-linux-x86_64/config/kibana.yml  | grep -v "^$"
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.207.131:9200"]
kibana.index: ".kibana"

启动

useradd kibana
chown -R kibana:kibana /data 
su - kibana
/data/kibana/kibana-7.14.0-linux-x86_64/bin/kibana

四、部署Kafka

安装java

# 安装java环境
yum -y install java-1.8.0-openjdk

安装kafka

tar zxvf kafka_2.12-3.0.0.tgz
mv kafka_2.12-3.0.0 /usr/local/kafka

配置环境变量

# 配置环境变量
cat > /etc/profile.d/zookeeper.sh << 'EOF'
export ZOOKEEPER_HOME=/usr/local/kafka
export PATH=$ZOOKEEPER_HOME/bin:$PATH
EOFcat > /etc/profile.d/kafka.sh << 'EOF'
export KAFKA_HOME=/usr/local/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOFsource /etc/profile

创建数据存储目录和日志存储目录

mkdir -p /usr/local/kafka/zookeeper
mkdir -p /usr/local/kafka/log/zookeeper
mkdir -p /usr/local/kafka/log/kafka# 创建zk需要的myid文件
echo 0 > /usr/local/kafka/zookeeper/myid

修改zk配置文件

# 注意Kafka安装目录下的config目录里
server.properties             #是Kafka的配置文件
zookeeper.properties          #是zookeeper的配置文件
cat >> /usr/local/kafka/config/zookeeper.properties << EOF
dataLogDir=/usr/local/kafka/log/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.0=192.168.207.167:2888:3888
EOFsed -i "s/dataDir\=\/tmp\/zookeeper/dataDir\=\/usr\/local\/kafka\/zookeeper/g" /usr/local/kafka/config/zookeeper.properties

修改Kafka配置文件

# /usr/local/kafka/config/server.properties修改listeners=PLAINTEXT://192.168.207.167:9092
advertised.listeners=PLAINTEXT://192.168.207.167:9092
log.dirs=/usr/local/kafka/log/kafka
delete.topic.enable=true
zookeeper.connect=192.168.207.167:2181

启动zk

/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties

启动Kafka

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

测试

# 创建一个topic
[root@kafka kafka]# bin/kafka-topics.sh --create --bootstrap-server 192.168.207.167:9092 --replication-factor 1   --partitions 1 --topic Hello-Kafka
Created topic Hello-Kafka.# 往topic里面输入消息
[root@kafka kafka]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.207.167:9092 --topic Hello-Kafka# 从topic里面消费消息
[root@kafka ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka --from-beginning# 查看topic列表
[root@kafka kafka]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.207.167:9092 --list
Hello-Kafka# 删除topic
[root@kafka kafka]# bin/kafka-topics.sh --delete --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka

五、部署logstash

部署logstash

mkdir -p /data/logstash
tar zxvf logstash-7.14.0-linux-x86_64.tar.gz -C /data/logstash/

添加配置文件

mkdir /data/logstash/logstash-7.14.0/conf.dcat > /data/logstash/logstash-7.14.0/conf.d/system.conf << 'EOF'
input { kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"topic_test1" type =>"topic_test1"codec =>"json" } 
}
output { if [type] == "topic_test1" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"kafka-system-%{+YYYY.MM.dd}" } }
}EOF
cat > /data/logstash/logstash-7.14.0/conf.d/httpd.conf << 'EOF'
input { kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"httpd_access" type =>"httpd_access"codec =>"json" } kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"httpd_error" type =>"httpd_error"codec =>"json" }
}
output { if [type] == "httpd_access" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"httpd-access-%{+YYYY.MM.dd}" } }if [type] == "httpd_error" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"httpd-error-%{+YYYY.MM.dd}" } }
}EOF

启动

/data/logstash/logstash-7.14.0/bin/logstash -f /data/logstash/logstash-7.14.0/conf.d/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/15031.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

21-信号集处理函数

屏蔽信号集 屏蔽某些信号 手动自动 未处理信号集 信号如果被屏蔽&#xff0c;则记录在未处理信号集中 非实时信号&#xff08;1~31&#xff09;&#xff0c;不排队&#xff0c;只留一个实时信号&#xff08;34~64&#xff09;&#xff0c;排队&#xff0c;保留全部 信号集…

springmvc中HandlerMapping是干什么用的

HandlerMapping处理器映射器 作用是根据request找到相应的处理器Handler和Interceptors&#xff0c;然后封装成HandlerExecutionChain对象返回 HandlerExecutionChain getHandler(HttpServletRequest request) throws Exception; 实现类 HandlerMapping帮助DispatcherServlet进…

【AD21】BOM表文件的输出

BOM表文件通常发给采购&#xff0c;采购可以购买BOM表中所需的元器件。 在菜单栏中点击报告->Bill of Materials。 在以下界面&#xff0c;点击Columns&#xff0c;对输出的BOM表中一些说明进行不显示&#xff0c;可以不显示Description和LibRef。值&#xff0c;位号&#…

文心智能体【焦虑粉碎机】——帮你赶走“坏”情绪

目录&#xff1a; 引言1.登录 文心智能体平台2.创建智能体3.配置智能体&#x1f337; 头像设置&#x1f337; 名称设置&#x1f337; 简介设置&#x1f337;指令设置&#x1f337; 开场白设置&#x1f337; 引导示例设置 4.使用智能体 引言 随着ChatGPT的爆火&#xff0c;人工智…

【Python】 掌握 Flask 请求数据获取的艺术

基本原理 在Web开发中&#xff0c;Flask是一个用Python编写的轻量级Web应用框架。它被广泛用于快速开发简单的Web应用。当用户通过浏览器或其他客户端向服务器发送请求时&#xff0c;Flask需要能够接收和解析这些请求中的数据。这些数据可以是GET请求的查询字符串、POST请求的…

【算法】分治 - 快速排序

快乐的流畅&#xff1a;个人主页 个人专栏&#xff1a;《算法神殿》《数据结构世界》《进击的C》 远方有一堆篝火&#xff0c;在为久候之人燃烧&#xff01; 文章目录 引言一、颜色分类二、排序数组三、数组中的第k个数四、最小的k个数总结 引言 本节主要介绍快速排序&#xf…

swust oj 1012: 哈希表(链地址法处理冲突)

直接采用二维数组模拟实现 #include <iostream> using namespace std; const int N 100; int arr[N][N]; int point[N];//计数int main() {int m, n,data;cin >> m >> n;for (int i 0; i < n; i){cin >> data;int key data % m;arr[key][point[…

对于高速信号完整性,一块聊聊啊(10)

本文包含的主要内容有: 过孔设计概述:从前面的各种基础知识到过孔设计,逐步对信号完整性有了初步了解,在过孔设计这里稍微做一个概述,也是个人的一些理解,算是一个小结。 过孔设计的必要性。 过孔结构的基础知识 实例:过孔设计仿真HFSS实例 过孔设计概述 通过前面…

题目----力扣--回文链表

题目 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为 回文链表 。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1] 输出&#xff1a;true示例 2&#xff1a; 输入&#xff1a;…

Kafka-生产者(producer)发送信息流程详解

Kafka概述 在 Kafka 消息发送的过程中&#xff0c;涉及到了两个重要的线程&#xff1a;主线程&#xff08;main thread&#xff09;和发送线程&#xff08;Sender thread&#xff09;。 1.主线程&#xff08;main thread&#xff09;&#xff1a; 应用程序在主线程中创建 Kaf…

详解CSS(三)及案例实现

目录 1.弹性布局 1.1 弹性布局案例 1.2flex 布局基本概念 1.3常用属性 1.3.1justify-content 1.3.2align-items 2.案例实现&#xff1a;小广告 3.案例实现&#xff1a;百度热榜 1.弹性布局 弹性布局&#xff08;Flex布局&#xff09;是一种用于创建自适应和响应式布局的…

“AIGC行业投资时机分析:评估当前市场发展阶段与未来需求趋势“

文章目录 每日一句正能量前言行业前景当前发展前景相关领域的发展趋势行业潜力竞争情况结论 市场需求人才需求情况机会挑战结论 选择与规划自我评估行业调研职业规划风险管理个人陈述示例 后记 每日一句正能量 胖了就减&#xff0c;没钱就赚&#xff0c;不会就学&#xff0c;不…

男士内裤什么材质的好?推荐男士内裤的注意事项

天气已经逐渐热了起来&#xff0c;广大男士们在夏天难免会出一身的汗&#xff0c;不少男士朋友都觉得一些吸湿性、透气性不好的内裤会在夏天穿着很不适&#xff0c;想挑选一些比较适合夏天的男士内裤&#xff0c;但现在的男士内裤品牌和材质分类却比较多&#xff0c;看得大家眼…

Python游戏编程:一步步用Python打造经典贪吃蛇小游戏

贪吃蛇作为一款极其经典且广受欢迎的小游戏&#xff0c;是早期 Windows 电脑和功能手机&#xff08;特别是诺基亚手机&#xff09;流行度极高的小游戏&#xff0c;是当时功能手机时代最具代表性的游戏之一。游戏的基本规则和目标十分简单&#xff0c;但却极具吸引力&#xff0c…

共享单车(八):数据库

实现后台数据库访问模块的框架&#xff0c;能够实现验证请求并响应&#xff08;支持数据库操作&#xff09;。 数据库设计 class SqlTabel //负责数据库表的创建 { public:SqlTabel(std::shared_ptr<MysqlConnection> sqlconn) :sqlconn_(sqlconn) {}bool CreateUserI…

详细分析crontab定时执行任务(附Demo | 定时清空Tomcat的实战)

目录 前言1. 基本知识2. Demo3. 实战3.1 错误版本3.2 正确版本 前言 由于用户量大&#xff0c;且导出的日志以及缓存特别多&#xff0c;急需定期删除文件 1. 基本知识 crontab 是一个用于定时执行任务的命令行工具&#xff0c;通常在 Unix 和类 Unix 系统中可用&#xff0c;表…

【微信小程序开发】小程序前后端交互--发送网络请求实战解析

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

三、自定义信号和槽函数(无参和有参)

需求&#xff1a; 下班后&#xff0c;小明说请小红吃好吃的&#xff0c;随便吃&#xff0c;吃啥买啥 无参&#xff1a;小红没有提出吃啥 有参&#xff1a;小红提出自己想吃的东西&#xff0c;吃啥取决于一时兴起&#xff08;emit触发&#xff09; 思路&#xff1a; 1&#xff…

Unreal Engine5 Landscape地形材质无法显示加载

UE5系列文章目录 文章目录 UE5系列文章目录前言一、解决办法 前言 在使用ue5做地形编辑的时候&#xff0c;明明刚才就保存的Landscape地形完全消失不见&#xff0c;或者是地形的材质不见了。重新打开UE5发现有时候能解决&#xff0c;但大多数时候还是没有解决&#xff0c;我下…

如何在 ASP.NET Core 中实现中间件管道

概述:借助 ASP.NET Core,中间件流水线可以作为一种轻量级、灵活的机制,使开发人员能够在请求流水线的不同阶段插入功能。这些中间件组件可以执行各种任务,例如日志记录、身份验证、授权、异常处理等。它们提供了一种封装和组织代码的方法,促进了更简洁、更易于维护的应用程…