1 Flume简介
-
Flume是一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统
-
主要特性:
- 它有一个简单、灵活的基于流的数据流结构(使用Event封装)
- 具有负载均衡机制和故障转移机制
- 一个简单可扩展的数据模型(Source、Channel、Sink)
-
Source组件:
- 从外界采集各种类型数据,将数据传递给Channel
- 支持类型有文件、目录、端口、Kafka等
- Exec Source:实现文件监控;注意 tail -F(跟踪文件) 和 tail -f 的区别
- NetCat TCP/UDP Source: 采集指定端口(tcp、udp)的数据
- Spooling Directory Source:采集文件夹里新增的文件
- Kafka Source:从Kafka消息队列中采集数据
-
Channel组件:
- 接受Source发出的数据,临时存储
- 支持类型有内存、文件,内存+文件、JDBC等
- Memory Channel:使用内存作为数据的存储,内存有限,临时存储
- File Channel:使用文件来作为数据的存储
- Spillable Memory Channel:使用内存和文件作为数据存储(即先存到内存中,如果内存中数据达到阈值再flush到文件中)
-
Sink组件:
- 从Channel中读取数据并存储到指定目的地
- 表现形式:控制台、HDFS、Kafka等
- 注意:Channel中的数据直到进入目的地才会被删除,当Sink写入失败后,可以自动重写,不会造成数据丢失
- Logger Sink:将数据作为日志处理
- HDFS Sink:将数据传输到HDFS中
- Kafka Sink:将数据发送到kafka消息队列中
2 Flume部署
访问flume下载地址,下载apache-flume-1.9.0-bin.tar.gz安装包,在/data/soft目录下解压
2.1 修改flume-env.sh文件
cd /data/soft/apache-flume-1.9.0-bin/conf
mv flume-env.sh.template flume-env.sh
将flume-env.sh.template剪切放到flume-env.sh中
2.2 示例
# example.conf: A single-node Flume configuration# Name the components on this agent 定义组件名称
# 这里定义了一个agent(代理)名为a1,包含了一个source(数据源)r1,一个sink(数据汇)k1,以及一个channel(通道)c1。
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source 配置数据源
# 这里设置数据源r1的类型为netcat(通过网络接收数据),绑定的IP地址为0.0.0.0,监听端口为44444。
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444# Describe the sink 配置数据汇
# 这里设置数据汇k1的类型为logger(打印日志)。
a1.sinks.k1.type = logger# Use a channel which buffers events in memory 配置通道
# 这里设置通道c1的类型为memory(内存缓冲区),容量为1000,事务容量为100。
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel 绑定数据源和数据汇到通道:
# 这里将数据源r1绑定到通道c1,将数据汇k1绑定到通道c1。
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
这段代码是一个单节点的Flume配置文件,用于将网络上的数据通过netcat source接收,并通过logger sink输出到日志中。
通过这个配置文件,Flume Agent将会监听44444端口上的网络流量,并将收到的数据通过logger sink输出到日志中。通道c1
将以内存缓冲的方式在数据源和数据汇之间传递数据。
执行命令:
cd /data/soft/apache-flume-1.9.0-bin
bin/flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console
该命令是用于启动 Apache Flume 的代理(agent)。以下是对各个部分的解释:
bin/flume-ng
: 这是 Flume 的启动脚本,用于执行 Flume 的命令。agent
: 表示要启动的 Flume 组件是一个代理(agent)。--conf conf
: 指定 Flume 配置文件的目录,即conf
目录。--conf-file conf/example.conf
: 指定 Flume 使用的配置文件路径,这里的路径是conf/example.conf
。--name a1
: 给代理(agent)指定一个名称,此处为a1
。-Dflume.root.logger=INFO,console
: 设置 Flume 的日志级别为 INFO,并将日志输出到控制台。综上所述,该命令将使用指定的配置文件(
conf/example.conf
)启动一个名为a1
的 Flume 代理(agent),并将日志输出到控制台。
开启另一个Terminal,执行远程连接命令,并输入传输内容:
telnet localhost 44444
telnet localhost 44444
是一条命令,用于在本地主机上使用 Telnet 协议连接到端口号为 44444 的服务。Telnet 是一种远程登录协议,允许你通过网络(通常是互联网)与其他计算机建立终端会话。在这种情况下,你正在尝试连接到本地主机(localhost)上的 44444 端口。
如果 44444 端口上有运行并配置为接受 Telnet 连接的服务,该命令将建立与该服务的连接。然而,如果该端口上没有运行任何服务,或者被防火墙阻止,连接尝试将失败。
一旦 Telnet 会话建立,你可以使用基于文本的命令与远程服务进行交互。
3 高级组件
相关配置需要参考官方文档
-
Source Interceptors:Source可以指定一个或者多个拦截器按先后顺序依次对采集到的数据进行处理
-
Channel Selectors:Source发往多个Channel的策略设置
-
Sink Processors:Sink发送数据的策略设置
-
Event:
- Event是Flume传输数据的基本单位,也是事务的基本单位;在文本文件中,通常一行记录就是一个Event
- Event里有header和body;header类型为Map<String, String>
- 可以在Source中增加header的<key, value>,在Channel和Sink中使用header中的值
3.1 Source Interceptors
- 常见Interceptors类型:Timestamp Interceptor、Host Interceptor、Search and Replace Interceptor 、Static Interceptor、Regex Extractor Interceptor等
3.2 Channel Selectors
- Channel Selectors类型包括:Replicating Channel Selector(default)和Multiplexing Channel Selector
- Replicating:会将Source采集过来的Event发往所有Channel
- Multiplexing:会根据Event中header里面的值,将Event发往不同的Channel
3.3 Sink Processors
- Sink Processors类型包括:Default Sink Processor、Load balancing Sink Processor和Failover Sink Processor
- Default:是默认的不用配置sinkgroup;Load balancing是负载均衡;Failover是故障转移,后面需要定义sinkgroup
4 性能优化
-
Flume优化
- 调整Flume进程的内存大小,建议设置1G~2G,内存设置太小可能导致频繁GC
- 启动多个Flume进程时,建议修改配置区分日志文件,复制多个conf目录,修改
log4j.properties
文件
-
Flume进程监控
- Flume是一个单进程程序,会存在单点故障,所以需要有一个监控机制,发现Flume进程Down掉之后,需要重启
- 通过Shell脚本使用
jps
命令对Flume进程进行监控、告警之后并自动重启