Flume学习笔记(2)—— Flume进阶

Flume进阶

Flume 事务

事务处理流程如下:

Put

  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并。
  • doRollback:channel内存队列空间不足,回滚数据

Take

  • doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列

Flume Agent 内部原理

ChannelSelector

ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel

其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)

  • ReplicatingSelector 会将同一个 Event 发往所有的 Channel
  • Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel

SinkProcessor

SinkProcessor 共有三种类型 , 分别是 DefaultSinkProcessor 、LoadBalancingSinkProcessor、FailoverSinkProcessor

  • DefaultSinkProcessor 对应的是单个的Sink
  • LoadBalancingSinkProcessor 可以实现负载均衡的功能
  • FailoverSinkProcessor 可以实现错误恢复的功能

Flume 拓扑结构

简单串联

将多个 flume 顺序连接起来,从最初的 source 开始到最终 sink 传送的目的存储系统

不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统

复制和多路复用

(单 source,多 channel、sink)

Flume 支持将事件流向一个或者多个目的地

这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地

负载均衡和故障转移

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能、

这里的agent1有三个sink,分别连接agent2,agent3,agent4,即使其中有的sink出现了故障,数据还是能同步到hdfs

聚合

业务中常用,比如说日志采集功能:

日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器,产生的日志处理起来也非常麻烦

可以采用聚合的方式,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析

Flume实战案例

复制和多路复用

需求:使用 Flume-1 监控文件变动

  1. Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS
  2. Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem

实现流程:
1.在job下创建文件夹group1,并在其中创建配置文件flume-file-flume.conf

配置文件中需要有1个source,2个channel,2个sink

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/apache-hive-3.1.2-bin/logs/hive.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

该配置文件的作用是将数据发送到两个不同的sink,再由sink发送到其他的agent进行处理

2.创建配置文件flume-flume-hdfs.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

source绑定上一个agent的sink1,然后上传到hdfs

3.创建配置文件:flume-flume-dir.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /home/why/data/flumeDemo/test1# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

参数说明:

sink类型为file_roll:Flume 1.11.0 User Guide — Apache Flume

可以将events保存到本地文件系统

  • directory:本地文件系统保存数据的路径(注意,该路径必须已经存在才可以)

4.分别启动相应的flume进程:

nohup bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf &

nohup bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf &

nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf &
5.在hdfs和文件夹中都能看到相应的内容:

hdfs:

文件系统:

负载均衡和故障转移

需求:使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能

实现流程:

1.在/opt/module/flume/job 目录下创建 group2 文件夹,创建配置文件flume-netcat-flume.conf

配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sinks = k1 k2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

参数说明:Flume 1.11.0 User Guide — Apache Flume

通过sink groups在一个agent中定义多个sink,并可以配置sink processor使用:Flume 1.11.0 User Guide — Apache Flume

2.创建 flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = logger# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

sink输出到本地的控制台

3.创建 flume-flume-console2.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

sink输出到本地的控制台

4.执行指令:

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

5.使用nc localhost 44444发送数据

由于console2设置的优先级高于console1,因此数据由console2接收到;

接下来将console2进程kill掉,数据就由console1接收了:

聚合

需求:

hadoop102 上的 Flume-1 监控文件/home/why/data/flumeDemo/test3/test3.log

hadoop103 上的 Flume-2 监控某一个端口的数据流

Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台

实现流程:

1.首先在三台服务器的job文件夹先创建目录group3

2.在hadoop102上,创建配置文件flume1-logger-flume.conf,source用于监控log日志文件,sink用于输出数据到下一级的Flume

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/why/data/flumeDemo/test3/test3.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.在hadoop103上,创建配置文件flume2-netcat-flume.conf,source用于监控端口44444的数据流,sink用于将数据传输到下一级的flume

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

注意,这两个agent的sink目的地都是hadoop104这一个服务器,因此hostname和port都相同

4.在hadoop104上创建配置文件flume3-flume-logger.conf,source用于接收flume1和flume2发送来的数据流,sink用于输出数据到控制台;

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

5.分别在三台服务器上执行指令

hadoop104: bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

hadoop102:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.conf

hadoop103:bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf

6.在hadoop102上向日志文件中追加内容:

echo "hello" > /home/why/data/flumeDemo/test3/test3.log

在hadoop103中通过nc hadoop103 44444向44444端口发送数据;

然后在hadoop104中即可接收到数据:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/147983.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数学建模 | 灰色预测原理及python实现

目录 一、灰色预测的原理 二、灰色预测的应用及python实现 一、灰色预测的原理 灰色预测是以灰色模型为基础,灰色模型GM(n,h)是微分方程模型,可用于描述对象做长期、连续、动态的反应。其中,n代表微分方程式的阶数,h代表微分方…

Spring Framework IOC依赖查找 - 按名称查找解析

IoC按名称查找共分为三类: 按名称按类型按集合 按名称查找 在Spring Framework中,实时加载和延迟加载是指在容器启动时是否立即实例化bean的不同策略。下面我们将分别介绍这两种加载方式及其应用场景。 tips: 当涉及到懒加载和延时加载时&#xff0…

windows排除故障工具pathping、MTR、sysinternals

pathping 基本上可以认为它是ping和tracert的功能合体。 pathping首先对目标执行tracert,然后使用ICMP对每一跳进行100次ping操作。 如图,是一个对8.8.8.8进行pathing操作。 MTR MTR是另一个多工具合体工具。 winmtr是mtr的windows版本。 这个工具…

vscode设置latex

vscode配置latex 1.安装vscode,并添加环境变量路径 2.安装latex,bin文件夹添加到环境变量路径 3.vscode安装插件 4.vscode->文件->首选项->显示配置内容->setting.json文件,查看其位置目录,通过我的电脑找到此文件(不要使用v…

OpenCV快速入门:图像形态学操作

文章目录 前言一、图像形态学基础1.1 背景介绍1.2 像素距离1.2.1 什么是像素距离?1.2.2 常见的像素距离度量方法1.2.3 计算像素距离的代码实现 1.3 图像连通性1.3.1 什么是图像连通性?1.3.2 连通类型1.3.3 连通组件标记1.3.4 连通性在图像处理中的应用 1…

在 el-table 中嵌入 el-checkbox el-input el-upload 多组件,实现复杂业务场景

由于业务场景的复杂性,需实现:在 el-table 表格中 嵌入 el-checkbox 多选框 及 el-input 输入框 及 el-upload 上传组件 ,先附上实现效果图。 从图片可以看出其实就是一个规格可以带有多个属性的规格表,实现此效果需涉及到的知识点…

【机器学习基础】对数几率回归(logistic回归)

🚀个人主页:为梦而生~ 关注我一起学习吧! 💡专栏:机器学习 欢迎订阅!后面的内容会越来越有意思~ 💡往期推荐: 【机器学习基础】机器学习入门(1) 【机器学习基…

ADS村田电感.mod(spice netlist文件)和.s2p模型导入与区别

ADS村田电感.mod(spice netlist文件)和.s2p模型导入与区别 简介环境过程s2pspice netlist(.mod文件)导入和结果对比 简介 记录了ADS村田电感.mod(spice netlist文件)和.s2p模型导入与区别 环境 ADS2020 …

什么是UV贴图?

UV 是与几何图形的顶点信息相对应的二维纹理坐标。UV 至关重要,因为它们提供了表面网格与图像纹理如何应用于该表面之间的联系。它们基本上是控制纹理上哪些像素对应于 3D 网格上的哪个顶点的标记点。它们在雕刻中也很重要。 为什么UV映射很重要? 默认情…

opencv(2): 视频采集和录制

视频采集 相关API VideoCapture()cap.read(): 返回两个值,第一个参数,如果读到frame,返回 True. 第二个参数为相应的图像帧。cap.release() VideoCapture cv2.VideoCapture(0) 0 表示自动检测,如果在笔记本上运行&…

助力水泥基建裂痕自动化巡检,基于yolov5融合ASPP开发构建多尺度融合目标检测识别系统

道路场景下的自动化智能巡检、洞体场景下的壁体类建筑缺陷自动检测识别等等已经在现实生活中不断地落地应用了,在我们之前的很多博文中也已经有过很多相关的实践项目经历了,本文的核心目的是想要融合多尺度感受野技术到yolov5模型中以期在较低参数量的情…

计算属性与watch的区别,fetch与axios在vue中的异步请求,单文本组件使用,使用vite创建vue项目,组件的使用方法

7.计算属性 7-1计算属性-有缓存 模板中的表达式虽然很方便,但是只能做简单的逻辑操作,如果在模版中写太多的js逻辑,会使得模板过于臃肿,不利于维护,因此我们推荐使用计算属性来解决复杂的逻辑 <!DOCTYPE html> <html lang"en"> <head><meta …

Go vs Rust:文件上传性能比较

在本文中&#xff0c;主要测试并比较了Go—Gin和Rust—Actix之间的多部分文件上传性能。 设置 所有测试都在配备16G内存的 MacBook Pro M1 上执行。 软件版本为&#xff1a; Go v1.20.5Rust v1.70.0 测试工具是一个基于 libcurl 并使用标准线程的自定义工具&#xff0c;能…

【性能】如何计算 Web 页面的 FP 指标

什么是 FP 指标 FP (First Paint) 为首次渲染的时间点&#xff0c;在性能统计指标中&#xff0c;从用户开始访问 Web 页面的时间点到 FP 的时间点这段时间可以被视为 白屏时间&#xff0c;也就是说在用户访问 Web 网页的过程中&#xff0c;FP 时间点之前&#xff0c;用户看到的…

KeyarchOS的CentOS迁移实践:使用操作系统迁移工具X2Keyarch V2.0

KeyarchOS的CentOS迁移实践&#xff1a;使用操作系统迁移工具X2Keyarch V2.0 作者&#xff1a; 猫头虎博主 文章目录 KeyarchOS的CentOS迁移实践&#xff1a;使用操作系统迁移工具X2Keyarch V2.0&#x1f405;摘要引言1. 迁移前的精心准备1.1 系统环境介绍1.2 深度数据验证1.2.…

Maven编译报错:javacTask: 源发行版 1.8 需要目标发行版 1.8

报错截图&#xff1a; IDEA中的jdk检查都正常设置的1.8一点毛病没有。参考其他帖子链接如下&#xff1a; https://blog.csdn.net/zhishidi/article/details/131480199https://blog.51cto.com/u_16213460/7197764https://blog.csdn.net/lck_csdn/article/details/125387878 逐…

Vue指令修饰符、v-bind、v-model、computed计算属性、watch侦听器

前言 持续学习总结输出中&#xff0c;Vue指令修饰符、v-bind、v-model、computed计算属性、watch侦听器 一、指令修饰符 1.什么是指令修饰符&#xff1f; 所谓指令修饰符就是通过“.”指明一些指令后缀 &#xff0c;不同的后缀封装了不同的处理操作 —> 简化代码 2.按键…

LeetCode(29)三数之和【双指针】【中等】

目录 1.题目2.答案3.提交结果截图 链接&#xff1a; 三数之和 1.题目 给你一个整数数组 nums &#xff0c;判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k &#xff0c;同时还满足 nums[i] nums[j] nums[k] 0 。请 你返回所有和为 0 且不重复…

Android 13.0 Launcher3仿ios长按app图标实现抖动动画开始拖拽停止动画

1.概述 在13.0的系统rom定制化开发中,在对系统原生Launcher3的定制需求中,也有好多功能定制的,在ios等电子产品中 的一些好用的功能,也是可以被拿来借用的,所以在最近的产品开发需求中,需求要求模仿ios的 功能实现长按app图标实现抖动动画,接下来看如何分析该功能的实现…

电子学会2023年6月青少年软件编程(图形化)等级考试试卷(二级)真题,含答案解析

青少年软件编程(图形化)等级考试试卷(二级) 一、单选题(共25题,共50分) 1. 运行下列哪段程序,可以让狗狗走到木屋门口?( ) A.