#作者:任少近
文章目录
- 需求描述
- 系统目标
- 系统组件
- Fluent Bit
- Fluentd
- Kafka
- 数据流与处理流程
- 日志采集
- 日志转发到 Fluentd
- 日志处理与转发到 Kafka
- Kafka 作为消息队列
- 具体配置
- Fluent-Bit的CM配置
- Fluent-Bit的DS配置
- Fluentd的CM配置
- Fluentd的DS配置
- Kafka查询结果
需求描述
Fluent Bit 将日志传输到 Fluentd,Fluentd 再将日志写入 Kafka
背景: 随着系统日志量的增加,尤其是在微服务架构下,日志的收集、处理和传输变得愈加复杂。为了实现高效的日志收集和处理,需要一个可靠且可扩展的日志管道。Fluent Bit、Fluentd 和 Kafka 的结合可以为这一需求提供强大的支持。
系统目标
Fluent Bit 作为日志收集器,从不同来源(如容器、应用程序、系统日志文件等)收集日志。
将收集到的日志实时转发到 Fluentd,Fluentd 对日志进行进一步的处理(如解析、过滤、增强等)。
Fluentd 将处理后的日志通过 Kafka 传输,Kafka 作为消息队列,提供日志的高吞吐量传输和存储。
系统组件
Fluent Bit
功能:负责日志的采集和转发,能够高效地从各种日志源收集日志数据,并将其发送到 Fluentd。
特点:轻量级、高效、低资源消耗,适用于边缘设备和容器环境。
Fluentd
功能:接收来自 Fluent Bit 的日志数据,对日志进行进一步的处理,如过滤、格式转换、增强等。
特点:支持丰富的插件生态系统,能够灵活地扩展和配置,适用于复杂的日志处理和存储需求。
Kafka
功能:作为日志数据的消息队列,提供高吞吐量、可靠的日志传输机制。Fluentd 将日志数据发送到 Kafka,Kafka 作为缓冲区存储和传递日志数据,确保日志的可靠性和可扩展性。
特点:高吞吐量、可扩展、容错能力强。
数据流与处理流程
日志采集
Fluent Bit 部署在日志源所在的节点或容器中,实时监控指定的日志文件(如 /var/log/test.log 等)。
Fluent Bit 使用 tail 插件采集日志,并将其转换为指定的格式(如 JSON)。
日志转发到 Fluentd
Fluent Bit 使用 forward 输出插件将日志数据转发到 Fluentd,通过指定 Fluentd 的 IP 地址和端口进行连接。
日志处理与转发到 Kafka
Fluentd 接收到日志后,可以进行各种处理(如过滤、解析、增强、格式转换等)。
处理后的日志通过 Kafka 输出插件将日志发送到指定的 Kafka 集群。
Kafka 将日志存储在其主题中,以便进行后续的分析、查询和处理。
Kafka 作为消息队列
Kafka 将日志数据持久化到其分区中,提供可靠的消息存储和高吞吐量的数据传输。
消费者 可以从 Kafka 中读取数据进行进一步处理或存储到其他系统(如 Elasticsearch、OpenSearch、数据库等)。
具体配置
Fluent-Bit的CM配置
fluent-bit.conf: |[SERVICE]Flush 1Parsers_File parsers.confHTTP_Server OnHTTP_Listen 0.0.0.0HTTP_PORT 3302[INPUT]Name tailTag regex-fluentDB /var/log/regex-fluent.dbRead_from_Head truePath /var/log/test.logPath_Key pod_log_path[OUTPUT]Name forwardMatch *Host fluentd-service.logging.svcPort 24224Retry_Limit 5
Fluent-Bit的DS配置
注意:日志目录的挂载
apiVersion: apps/v1
kind: DaemonSet
metadata:name: fluent-bitnamespace: logginglabels:k8s-app: fluent-bit-loggingversion: v1kubernetes.io/cluster-service: "true"
spec:selector:matchLabels:k8s-app: fluent-bit-loggingtemplate:metadata:labels:k8s-app: fluent-bit-loggingversion: v1kubernetes.io/cluster-service: "true"annotations:prometheus.io/scrape: "true"prometheus.io/port: "2020"prometheus.io/path: /api/v1/metrics/prometheusspec:nodeSelector:zk-app: appcontainers:- name: fluent-bitimage: registry.cn-hangzhou.aliyuncs.com/ali_cloud_images/fluent-bit:1.9imagePullPolicy: Neverports:- containerPort: 2020volumeMounts:- name: varlogmountPath: /var/log- name: varlibdockercontainersmountPath: /var/lib/docker/containersreadOnly: true- name: fluent-bit-configmountPath: /fluent-bit/etc/- name: dbmountPath: /tail-db/terminationGracePeriodSeconds: 10volumes:- name: varloghostPath:path: /var/log- name: varlibdockercontainershostPath:path: /var/lib/docker/containers- name: fluent-bit-configconfigMap:name: fluent-bit-config- name: dbhostPath:path: /home/chb/hundun/fluent-bittype: DirectoryserviceAccountName: fluent-bittolerations:- key: node-role.kubernetes.io/masteroperator: Existseffect: NoSchedule- operator: "Exists"effect: "NoExecute"- operator: "Exists"effect: "NoSchedule"
Fluentd的CM配置
fluent.conf: |-<source>@type forwardport 24224bind 0.0.0.0tag test</source><match test>@type kafka2brokers 192.168.123.100:9092 # Kafka broker 地址topic fluentd_topic<format>@type json</format></match>
Fluentd的DS配置
注意:端口的暴露
apiVersion: apps/v1
kind: DaemonSet
metadata:name: fluentd-kafkanamespace: logginglabels:k8s-app: fluentd-kafkakubernetes.io/cluster-service: "true"addonmanager.kubernetes.io/mode: Reconcile
spec:selector:matchLabels:k8s-app: fluentd-kafkatemplate:metadata:labels:k8s-app: fluentd-kafkakubernetes.io/cluster-service: "true"# 此注释确保如果节点被驱逐,fluentd不会被驱逐,支持关键的基于 pod 注释的优先级方案。annotations:scheduler.alpha.kubernetes.io/critical-pod: ''spec:initContainers:- name: init-permissionimage: busyboximagePullPolicy: Nevercommand: ["sh", "-c", "mkdir -p /var/log/td-agent && chown -R 1000:1000 /var/log/td-agent"]nodeSelector:zk-app: appserviceAccountName: fluentd-kafkacontainers:- name: fluentd-kafkaimage: fluentd-kafka:latestimagePullPolicy: NeversecurityContext:runAsUser: 0ports: - containerPort: 24224name: forward-portvolumeMounts:- name: fluentd-config-volumemountPath: /opt/bitnami/fluentd/conf/fluentd.confsubPath: fluent.conf- name: varlogmountPath: /var/log- name: posmountPath: /var/log/td-agentvolumes:- name: fluentd-config-volumeconfigMap:name: fluentd-config- name: varloghostPath:path: /home/chb/test- name: poshostPath:path: /var/log/td-agentimagePullSecrets:- name: default-secrettolerations:- operator: ExiststerminationGracePeriodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:name: fluentd-servicenamespace: logging
spec:selector:k8s-app: fluentd-kafkaports:- protocol: TCPport: 24224targetPort: 24224clusterIP: None