Flume-ng 高可用搭建-与测试

前提:

1)五台虚拟机(三台也可以)
2)flume单节点测试并学会
3)hadoop集群搭建完成

Flume NG集群,架构图

在这里插入图片描述
Flume的存储可以支持多种,这里只列举了HDFS

角色分配

名称HOST角色
Agent1chun1Web Server
Agent2chun2Web Server
Agent3chun3Web Server
Collector1chun4AgentMstr1
Collector2chun5AgentMstr1

表中所示,Agent1,Agent2,Agent3数据分别流入到Collector1和Collector2,Flume NG本身提供了Failover机制,可以自动切换和恢复。在上图中,有3个产生日志服务器分布在不同的机房,要把所有的日志都收集到一个集群中存储。下 面我们开发配置Flume NG集群

配置

在单点Flume中(这里介绍了单点的配置),基本配置都完成了,我们只需要新添加两个配置文件,它们是agent.properties和collector.properties,其配置内容如下所示:

agent配置

(根据自己需求把source读的路径(r1.command )和要配置的collector的主机名修改也就是k1和k2的hostname)
[root@chun1 flume-1.9.0-bin]# vi conf/agent.properties#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2#set gruop
agent1.sinkgroups = g1#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /usr/local/flume-1.9.0/job/log/test.log
agent1.sources.r1.interceptors = i1 i2
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = Type
agent1.sources.r1.interceptors.i1.value = LOGIN
agent1.sources.r1.interceptors.i2.type = timestamp# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = chun4
agent1.sinks.k1.port = 52020# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = chun5
agent1.sinks.k2.port = 52020#set sink group
agent1.sinkgroups.g1.sinks = k1 k2#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000

修改后把flume发送给chun1,chun2,chun3,chun4,chun5( 发送后chun1,chun2,chun3不需要修改)

(chun4,chun5把刚才创建的agent.properties删除,添加一个collector.properties 并加入以下内容)

collector配置

记得把主机名改掉
[root@chun4 conf]# vi collector.properties #set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = chun4  //chun5的此处要改
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
a1.sources.r1.interceptors.i1.value = chun4   //chun5的此处要改
a1.sources.r1.channels = c1#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/home/hdfs/flume/logdfs
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

运行 (先启动两个collector然后在启动三个agent)

在4,5上运行
cd /usr/local/flume-1.9.0bin/flume-ng agent -n a1 -c conf -f conf/collector.properties -Dflume.root.logger=DEBUG,console
在1,2,3上运行
cd /usr/local/flume-1.9.0bin/flume-ng agent -n agent1 -c conf -f conf/agent.properties -Dflume.root.logger=DEBUG,console

插入数据

往test.txt里插入数据
代码意思:没0.5秒循环插入chun-chun-chun
while true
> do
> echo 'chun-chun-chun' >> /usr/local/flume-1.9.0/job/log/test.log 
> sleep 0.5
> done

查看 (hdfs的web端查看)

在这里插入图片描述

这时你会发现只有flume-ng1下有数据:说明是先往chun4上传

然后把chun4的进程杀死,就会发现数据开始往chun5传

然后再次打开(再次启动报错请看)数据又到chun4了(数据会先往权重高的传输)

(配置文件里有设置权重

agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1

)可以根据自己需求设置
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/437677.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【转】D365 FO第三方集成(一)---访问认证(应用注册)

从Axapta3.0的COM Business Connector,到AX4.0和AX2009的 .NET Business Connector,到AX2012的WCF Services,最后到D365FO的接口方式。 AX的接口演化,几乎见证了微软整个技术栈的变迁。 D365 FO的Web Services比起AX2012有了飞跃&…

【转】Postman系列一:Postman安装及使用过程中遇到的问题

一:Postman的简介、下载安装及界面说明 1.Postman的简单介绍 Postman是一款强大的网页调试和发送网页HTTP请求的工具,Postman让开发和测试人员做API(接口)测试变得更加简单。在我使用Postman之前还有一个版本,就是需要…

Hive报错:Exception in thread main java.lang.Incom。。。。 Class com.google.common.collect.ImmutableSotil

Exception in thread “main” java.lang.IncompatibleClassChangeError: Class com.google.common.collect.ImmutableSotil.NavigableMap Exception in thread "main" java.lang.IncompatibleClassChangeError: Class com.google.common.collect.ImmutableSotil.Na…

Python 数据分析三剑客之 Pandas(十):数据读写

CSDN 课程推荐:《迈向数据科学家:带你玩转Python数据分析》,讲师齐伟,苏州研途教育科技有限公司CTO,苏州大学应用统计专业硕士生指导委员会委员;已出版《跟老齐学Python:轻松入门》《跟老齐学Py…

Hive优化

一、 Fetch抓取 Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。 例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。 在hive-default…

COVID-19 肺炎疫情数据实时监控(python 爬虫 + pyecharts 数据可视化 + wordcloud 词云图)

文章目录【1x00】前言【2x00】思维导图【3x00】数据结构分析【4x00】主函数 main()【5x00】数据获取模块 data_get【5x01】初始化函数 init()【5x02】中国总数据 china_total_data()【5x03】全球总数据 global_total_data()【5x04】中国每日数据 china_daily_data()【5x05】境外…

【转】Postman系列二:Postman中get接口实战讲解(接口测试介绍,接口测试流程,头域操作)

一:接口测试介绍 接口测试:就是针对软件对外提供服务的接口输入输出进行测试,以及接口间相互逻辑的测试,验证接口功能和接口描述文档的一致性。 接口测试好处:接口测试通常能对系统测试的更为彻底,更高的保…

hive或mysql报错Too many connections

in acquiring locks: com.zaxxer.hikari.pool.HikariPool$PoolInitializationException: Failed to initializ e pool: Data source rejected establishment of connection, message from server: “Too many connections” 显示:Too many connections &#xff0c…

Python3 爬虫实战 — 前程无忧招聘信息爬取 + 数据可视化

爬取时间:2020-07-11(2020年10月测试,增加了反爬,此代码已失效!!!)实现目标:根据用户输入的关键字爬取相关职位信息存入 MongoDB,读取数据进行可视化展示。涉…

【转】Postman系列三:Postman中post接口实战(上传文件、json请求)

一:接口测试过程中GET请求与POST请求的主要区别 从开发角度我们看get与post的主要区别是: 1.Get是用来从服务器上获得数据,而Post是用来向服务器上传递数据; 2.Get安全性比Post低:Get将表单中数据的按照keyvalue的形式…

Hadoop datanode正常启动,但是jps差不多datanode进程,而且Live nodes中却缺少节点

启动时可以看到启动成功,但是在chun2,jps的时候却没有了datanode进程,而且web端Live nodes也缺少了 百度搜索之后查到是因为hdfs.site.xml配置文件里dfs.data.dir配置的路径重复,就是多个节点存放data数据的目录路径相同了&#x…

【Python 必会技巧】使用 Python 追加写入 json 文件或更改 json 文件中的值

追加写入 json 文件 有一个 test.json 文件,包含内容如下: {"key_1": "value_1" }现需要追加写入 json 文件,向其中增加值,使其包含内容如下: {"key_1": "value_1","…

【转】Postman系列四:Postman接口请求设置环境变量和全局变量、测试沙箱和测试断言、测试集运行与导入数据文件

一:Postman中接口请求设置环境变量和全局变量 全局变量和环境变量可以通过Pre-request Script和Tests设置,会在下面测试沙箱和测试断言中讲到。 全局变量的设置:官网参考https://learning.getpostman.com/docs/postman/environments_and_glob…

Python 算法之递归与尾递归,斐波那契数列以及汉诺塔的实现

文章目录递归概念递归要素递归与迭代的区别示例一:阶乘示例二:斐波那契数列示例三:汉诺塔问题尾递归Python 中尾递归的解决方案递归概念 递归:程序调用自身的编程技巧称为递归( recursion)。用一种通俗的话…

【转】Postman系列五:Postman中电商网站cookie、token检验与参数传递实战

一:Postman中电商网站cookie实战 Postman接口请求使用cookie两种方式: 1.直接在header(头域)中添加cookie,适用于已知请求cookie头域的情况 2.使用Postman的cookie管理机制,即可以手动添加,同时…

Python 数据结构之栈的实现

文章目录栈的概念栈的特点栈的操作Python 实现栈栈的简单应用:括号匹配问题栈的简单应用:倒序输出一组元素栈的概念 栈(stack)又名堆栈,栈是一种线性数据结构,用先进后出或者是后进先出的方式存储数据&…

Scala学习

Scala学习(1.在菜鸟驿站简单学习) 由于学过java等语言,Scala简单的把语法多敲多练习就可以 新语言开始学习主要是语法的熟悉阶段,菜鸟教程里的内容全部完成一遍 object HelloWorld {def main(args: Array[String]): Unit {pri…

CSDN 2020 博客之星实时数据排名(Python 爬虫 + PyEcharts)

CSDN 2020 博客之星实时数据排名:csdn.itrhx.com CSDN 一年一度的博客之星评选开始了,官网地址:https://bss.csdn.net/m/topic/blog_star2020 ,由于官网是按照随机编号排序的,没有按照票数多少排序,为了方便…

Scala进阶-函数练习

1) map()函数 可以对整个集合进行操作,比如 创建一个Seq列表,然后用map对集合*2 val salaries Seq(2,3,4,5)val newsalaries salaries.map(_*2)2) flatMap函数 faltMap函数是map一种扩展,faltMap中传入一个函数&a…