文章目录
- Kafka
- Kafka 概述
- 使用消息队列的好处
- Kafka 的特性
- Kafka 系统架构
- Kafka 的应用场景
- Kafka 的优缺点
- Kafka 集群部署
- 下载安装包
- 安装 Kafka
- Kafka 命令行操作
- Kafka 架构深入
- Filebeat+Kafka+ELK 部署指南~
- 部署 Zookeeper+Kafka 集群
- 部署 Filebeat
- 部署 ELK(Logstash 配置)
- Kibana 配置与查看日志
Kafka
Kafka 概述
Kafka 是一个分布式、基于发布/订阅模式的消息队列系统,由 Linkedin 开发并贡献给 Apache 基金会,现已成为顶级开源项目。它主要应用于大数据领域的实时计算以及日志收集,具有高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性和高并发的特性。
使用消息队列的好处
- 解耦:允许独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 可恢复性:系统的一部分组件失效时,不会影响到整个系统。
- 缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
- 灵活性 & 峰值处理能力:使用消息队列能够使关键组件顶住突发的访问压力。
- 异步通信:允许用户把一个消息放入队列,但并不立即处理它。
Kafka 的特性
- 高吞吐量、低延迟:每秒可以处理几十万条消息,延迟最低只有几毫秒。
- 可扩展性:Kafka 集群支持热扩展。
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
- 容错性:允许集群中节点失败。
- 高并发:支持数千个客户端同时读写。
Kafka 系统架构
- Broker:
- 一台 Kafka 服务器就是一个 broker。
- 一个集群由多个 broker 组成。
- 一个 broker 可以容纳多个 topic。
- Topic:
- 可以理解为一个队列,生产者和消费者面向的都是一个 topic。
- 类似于数据库的表名或者 ES 的 index。
- 物理上不同 topic 的消息分开存储。
- Partition:
- 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker 上。
- 一个 topic 可以分割为一个或多个 partition,每个 partition 是一个有序的队列。
- Kafka 只保证 partition 内的记录是有序的。
- 数据路由规则:指定了 partition 则直接使用;未指定但指定 key,通过对 key 的 value 进行 hash 取模选出一个 partition;都未指定,使用轮询选出一个 partition。
- 每个 partition 中的数据使用多个 segment 文件存储。
- Replica:
- 副本机制,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作。
- 一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
- Leader:
- 当前负责数据的读写的 partition。
- Follower:
- 跟随 Leader,所有写请求都通过 Leader 路由。
- 数据变更会广播给所有 Follower,与 Leader 保持数据同步。
- 只负责备份,不负责数据的读写。
- 如果 Leader 故障,则从 Follower 中选举出一个新的 Leader。
- Producer:
- 数据的发布者,将消息 push 发布到 Kafka 的 topic 中。
- Consumer:
- 从 broker 中 pull 拉取数据。
- 可以消费多个 topic 中的数据。
- Consumer Group(CG):
- 由多个 consumer 组成。
- 所有的消费者都属于某个消费者组。
- 消费者组内每个消费者负责消费不同分区的数据,防止数据被重复读取。
- 消费者组之间互不影响。
- Offset 偏移量:
- 唯一标识一条消息。
- 决定读取数据的位置。
- 消费者通过偏移量来决定下次读取的消息。
- 消息被消费之后,并不被马上删除。
- 某一个业务也可以通过修改偏移量达到重新读取消息的目的。
- 消息默认生命周期为 1 周(7*24小时)。
- Zookeeper:
- 在 Kafka 中,ZooKeeper 负责维护 Kafka 集群的一些元数据和 leader 选举等协调工作。
- 元数据存储:存储主题、分区、Broker 节点等信息。
- Leader 选举:参与领导者选举的过程。
- 健康监控:进行集群的健康监控。
- 消费者组协调:协调和追踪消费者的位置信息。
Kafka 的应用场景
- 日志收集:Kafka 可以被用作日志收集系统,将各种应用的日志数据集中收集起来,方便后续的处理和分析。
- 实时计算:Kafka 可以作为实时计算系统的数据源,如 Spark Streaming、Flink 等,用于实时数据处理和分析。
- 消息通讯:Kafka 可以作为消息通讯系统,实现不同系统之间的数据交换和通信。
- 流量削峰:在高并发场景下,Kafka 可以作为流量削峰的工具,将大量的请求缓存到 Kafka 中,然后按照一定的速率进行处理,避免系统崩溃。
Kafka 的优缺点
优点:
- 高吞吐量、低延迟。
- 可扩展性强。
- 持久性、可靠性高。
- 支持多副本、容错性强。
- 社区活跃、生态丰富。
缺点:
- 依赖 Zookeeper,如果 Zookeeper 出现故障,会影响 Kafka 的正常运行。
- 数据一致性方面,虽然 Kafka 提供了多副本机制,但是在极端情况下,仍然可能存在数据丢失的风险。
- 消息顺序问题,如果生产者发送消息到多个分区,那么消费者消费时可能无法保证消息的顺序性。
Kafka 集群部署
下载安装包
- 官方下载地址:Apache Kafka 下载页面
- 步骤:
- 切换到
/opt
目录。 - 使用
wget
从清华大学镜像站下载 Kafka 2.7.1 版本。
- 切换到
cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz
安装 Kafka
- 步骤:
- 解压 Kafka 压缩包。
- 将解压后的目录移动到
/usr/local/kafka
。 - 备份并编辑
server.properties
文件,配置 Kafka。
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafkacd /usr/local/kafka/config/
cp server.properties{,.bak}
vim server.properties
-
关键配置项:
broker.id
:每个 Kafka 实例的唯一标识,集群中每个实例的broker.id
必须不同。listeners
:指定 Kafka 监听的 IP 和端口。num.network.threads
和num.io.threads
:分别设置处理网络请求和磁盘 IO 的线程数。log.dirs
:Kafka 数据和日志的存放路径。zookeeper.connect
:指定 Zookeeper 集群的地址。
-
环境变量配置:
- 将 Kafka 的 bin 目录添加到 PATH 环境变量中。
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
- 配置启动脚本:
- 创建一个 Kafka 的启动脚本,并设置开机自启。
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esacchmod +x /etc/init.d/kafka
chkconfig --add kafka
service kafka start
Kafka 命令行操作
- 创建 topic:
kafka-topics.sh --create --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --replication-factor 2 --partitions 3 --topic test
- 查看 topic:
kafka-topics.sh --list --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181
kafka-topics.sh --describe --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --topic test
- 发布和消费消息:
# 生产者
kafka-console-producer.sh --broker-list 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test# 消费者
kafka-console-consumer.sh --bootstrap-server 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test --from-beginning
- 修改和删除 topic:
# 修改分区数
kafka-topics.sh --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --alter --topic test --partitions 6# 删除 topic
kafka-topics.sh --delete --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --topic test
Kafka 架构深入
-
工作流程及文件存储机制:
- Kafka 以 topic 对消息进行分类,producer 和 consumer 都是面向 topic 的。
- Topic 是逻辑概念,partition 是物理概念,每个 partition 对应一个 log 文件。
- 为防止 log 文件过大,Kafka 采用分片和索引机制,将每个 partition 分为多个 segment,每个 segment 包含
.index
和.log
文件。
-
数据可靠性保证:
- Kafka 通过 ack 应答机制保证数据可靠性,producer 发送数据后需要等待 broker 的确认。
-
数据一致性问题:
- LEO:每个副本的最大 offset。
- HW:消费者能见到的最大 offset,所有副本中最小的 LEO。
- Leader 和 follower 故障时的数据恢复和同步机制。
-
ack 应答机制:
- Kafka 提供了三种可靠性级别(acks=0, 1, -1),用户可以根据需求选择。
- 幂等性:在 0.11 版本及以后,Kafka 引入了幂等性特性,保证 producer 发送重复数据时,server 端只持久化一条。
注释:
- Kafka 的安装和配置需要根据集群的实际环境进行调整,特别是 IP 地址和端口号。
- 在生产环境中,通常需要配置更多的参数以优化性能和可靠性。
- Kafka 的数据可靠性和一致性机制是其核心特性之一,理解这些机制对于保证数据的安全性和一致性至关重要。
Filebeat+Kafka+ELK 部署指南~
部署 Zookeeper+Kafka 集群
- 目的:搭建消息队列系统,用于日志数据的传输。
- 步骤:
- 安装并配置 Zookeeper 集群。
- 安装并配置 Kafka 集群,指定 Zookeeper 集群地址。
- 启动 Zookeeper 和 Kafka 服务,确保集群正常运行。
部署 Filebeat
- 目的:收集服务器上的日志数据。
- 步骤:
- 下载并解压 Filebeat 到指定目录(如
/usr/local/filebeat
)。 - 编辑
filebeat.yml
配置文件:filebeat.prospectors: - type: logenabled: truepaths:- /var/log/httpd/access_logtags: ["access"]- type: logenabled: truepaths:- /var/log/httpd/error_logtags: ["error"]# 添加输出到 Kafka 的配置 output.kafka:enabled: truehosts: ["192.168.80.10:9092","192.168.80.11:9092","192.168.80.12:9092"]topic: "httpd"
- 启动 Filebeat,开始收集日志并发送到 Kafka。
- 下载并解压 Filebeat 到指定目录(如
部署 ELK(Logstash 配置)
- 目的:从 Kafka 拉取日志数据,并处理、存储到 Elasticsearch 中。
- 步骤:
- 在 Logstash 组件所在节点上,新建一个 Logstash 配置文件
kafka.conf
:input {kafka {bootstrap_servers => "192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092"topics => "httpd"type => "httpd_kafka"codec => "json"auto_offset_reset => "latest"decorate_events => true} }output {if "access" in [tags] {elasticsearch {hosts => ["192.168.80.30:9200"]index => "httpd_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.80.30:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug } }
- 启动 Logstash,开始从 Kafka 拉取日志并存储到 Elasticsearch。
- 在 Logstash 组件所在节点上,新建一个 Logstash 配置文件
Kibana 配置与查看日志
- 目的:通过 Kibana 可视化界面查看日志数据。
- 步骤:
- 在浏览器中访问 Kibana(如
http://192.168.80.30:5601
)。 - 登录 Kibana(如果设置了登录认证)。
- 单击“Create Index Pattern”按钮,添加索引模式,例如
httpd_access-*
和httpd_error-*
(注意:这里应与 Logstash 配置中的 index 名称匹配,但原笔记中的filebeat_test-*
是不正确的)。 - 单击“create”按钮创建索引模式。
- 单击“Discover”按钮,可查看图表信息及日志信息。
- 在浏览器中访问 Kibana(如
注释:
- 在配置 Filebeat 和 Logstash 时,确保 Kafka 集群的地址和 topic 名称正确无误。
- Logstash 的
auto_offset_reset
参数决定了从 Kafka 拉取数据的起始位置,latest
表示从最新的数据开始拉取,earliest
表示从头开始拉取。 - Kibana 中的索引模式应与 Logstash 配置中的 index 名称一致,以便正确显示日志数据。
- 在实际部署中,还需要考虑安全性、性能优化等方面的问题。