前提:
1)五台虚拟机(三台也可以)
2)flume单节点测试并学会
3)hadoop集群搭建完成
Flume NG集群,架构图
Flume的存储可以支持多种,这里只列举了HDFS
角色分配
名称 | HOST | 角色 |
---|---|---|
Agent1 | chun1 | Web Server |
Agent2 | chun2 | Web Server |
Agent3 | chun3 | Web Server |
Collector1 | chun4 | AgentMstr1 |
Collector2 | chun5 | AgentMstr1 |
表中所示,Agent1,Agent2,Agent3数据分别流入到Collector1和Collector2,Flume NG本身提供了Failover机制,可以自动切换和恢复。在上图中,有3个产生日志服务器分布在不同的机房,要把所有的日志都收集到一个集群中存储。下 面我们开发配置Flume NG集群
配置
在单点Flume中(这里介绍了单点的配置),基本配置都完成了,我们只需要新添加两个配置文件,它们是agent.properties和collector.properties,其配置内容如下所示:
agent配置
(根据自己需求把source读的路径(r1.command )和要配置的collector的主机名修改也就是k1和k2的hostname)
[root@chun1 flume-1.9.0-bin]# vi conf/agent.properties#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2#set gruop
agent1.sinkgroups = g1#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /usr/local/flume-1.9.0/job/log/test.log
agent1.sources.r1.interceptors = i1 i2
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = Type
agent1.sources.r1.interceptors.i1.value = LOGIN
agent1.sources.r1.interceptors.i2.type = timestamp# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = chun4
agent1.sinks.k1.port = 52020# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = chun5
agent1.sinks.k2.port = 52020#set sink group
agent1.sinkgroups.g1.sinks = k1 k2#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
修改后把flume发送给chun1,chun2,chun3,chun4,chun5( 发送后chun1,chun2,chun3不需要修改)
(chun4,chun5把刚才创建的agent.properties删除,添加一个collector.properties 并加入以下内容)
collector配置
记得把主机名改掉
[root@chun4 conf]# vi collector.properties #set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = chun4 //chun5的此处要改
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
a1.sources.r1.interceptors.i1.value = chun4 //chun5的此处要改
a1.sources.r1.channels = c1#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/home/hdfs/flume/logdfs
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
运行 (先启动两个collector然后在启动三个agent)
在4,5上运行
cd /usr/local/flume-1.9.0bin/flume-ng agent -n a1 -c conf -f conf/collector.properties -Dflume.root.logger=DEBUG,console
在1,2,3上运行
cd /usr/local/flume-1.9.0bin/flume-ng agent -n agent1 -c conf -f conf/agent.properties -Dflume.root.logger=DEBUG,console
插入数据
往test.txt里插入数据
代码意思:没0.5秒循环插入chun-chun-chun
while true
> do
> echo 'chun-chun-chun' >> /usr/local/flume-1.9.0/job/log/test.log
> sleep 0.5
> done
查看 (hdfs的web端查看)
这时你会发现只有flume-ng1下有数据:说明是先往chun4上传
然后把chun4的进程杀死,就会发现数据开始往chun5传
然后再次打开(再次启动报错请看)数据又到chun4了(数据会先往权重高的传输)
(配置文件里有设置权重
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
)可以根据自己需求设置