Flume 快速入门【概述、安装、拦截器】

文章目录

    • 什么是 Flume?
    • Flume 组成
    • Flume 安装
    • Flume 配置任务文件
      • 应用示例
      • 启动 Flume 采集任务
    • Flume 拦截器
      • 编写 Flume 拦截器
      • 拦截器应用

什么是 Flume?

Flume 是一个开源的数据采集工具,最初由 Apache 软件基金会开发和维护。它的主要目的是帮助用户将大规模数据从各种数据源(如日志文件、网络数据源、消息队列等)采集、传输和加载到数据存储系统(如 Hadoop HDFS、Apache HBase、Apache Hive 等)。

Flume 旨在处理大规模数据流,以便进行数据分析和处理。

Flume 组成

Flume (配置)主要由以下 4 个部分组成:

1. 数据源(Source): Flume 可以从多种数据源收集数据,例如日志文件、网络流、消息队列等。

2. 通道(Channel): 采集的数据被存储在通道中,等待传输到目标数据存储系统。Flume 支持多种不同类型的通道,如内存通道、文件通道和 Kafka 通道。

3. 拦截器(Interceptor): 拦截器允许用户对采集的数据进行预处理和转换,以满足特定需求。

4. 接收器(Sink): 接收器将数据传输到目标数据存储系统,如 Hadoop HDFS、HBase、Kafka 等。

Flume 通过灵活的配置,允许用户根据其数据采集需求来定义数据流的整个流程,包括数据源、通道、拦截器和接收器。

这使得 Flume 成为处理大规模数据采集和传输任务的强有力工具,构建数据管道,将分散的数据整合到中心存储或处理系统中,用于实时或者离线数据分析和报告。

Flume 安装

官方安装包下载地址:http://archive.apache.org/dist/flume

本篇博客使用的版本为:Flume-1.10.1

1. 解压

tar -zxvf apache-flume-1.10.1-bin.tar.gz -C /opt/

2. 配置环境变量

vim /etc/profile

文件末尾添加:

#FLUME_HOME
export FLUME_HOME=/opt/flume-1.10.1
export PATH=$PATH:$FLUME_HOME/bin

刷新环境变量:source /etc/profile

其实到这里,Flume 算是安装完了,但是为了后期使用方便,这里再调整一下配置参数。

修改日志存储与输出:

cd $FLUME_HOMEvim conf/log4j2.xml

在该文件中修改日志文件的存储目录(正文第 3 行)

    <Property name="LOG_DIR">/opt/flume-1.10.1/logs</Property>

在该文件中添加日志控制台输出方式(正文末尾)

      <AppenderRef ref="Console" />

默认只有 LogFile 日志文件的输出方式。


修改堆内存大小:

cd $FLUME_HOMEvim conf/flume-env.sh

如果是本地学习或者测试环境建议调小一点:

export JAVA_OPTS="-Xms512m -Xmx2048m -Dcom.sun.management.jmxremote"

我这里调整最小为 512MB,最大 2048MB,也可以将最大和最小调整为一样的,避免进行内存交换。

Flume 配置任务文件

Flume 最主要的内容就是配置任务文件了,在文章开头提到过,主要由四部分组成:

  • 数据源(Source)

  • 通道(Channel)

  • 拦截器(Interceptor)

  • 接收器(Sink)

我们可以根据需求,进入 Flume 的官方网站,查阅各项参数如何进行配置,按照要求配置即可。

配置查阅网站:Flume 1.10.1 User Guide

在这里插入图片描述

其中给出了一个模板文件,内容如下所示:

# example.conf: A single-node Flume configuration# Name the components on this agent  声明变量名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source  配置数据源
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink  配置接收器(存储源)
a1.sinks.k1.type = logger# Use a channel which buffers events in memory 配置管道参数
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel  组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

根据该模板文件就可以来快速构建一个数据采集的配置文件啦。

应用示例

将 Maxwell 发送到 Kafka 消息队列中的数据采集到 HDFS 上。

# --------- 声明变量名称 ---------
a1.sources = r1
a1.sinks = k1
a1.channels = c1# --------- 配置数据源 ---------
# 指定数据源类型
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 指定连接地址
a1.sources.r1.kafka.bootstrap.servers = hadoop120:9092,hadoop121:9092,hadoop122:9092
# 指定消费者组别,防止多个消费者之间引发数据冲突
a1.sources.r1.kafka.consumer.group.id = flume1
# 指定主题名称,这里需要和 MaxWell 指定发送的主题保持一致,否则会采集不到数据
a1.sources.r1.kafka.topics = topic_db# --------- 配置接收器 ---------
# 指定存储源类型
a1.sinks.k1.type = hdfs
# 动态规划 HDFS 写入路径
a1.sinks.k1.hdfs.path = /test/%{tableName}_inc/%Y/%m/%d/# 当以下值中的其中一个满足时,触发滚动操作,将数据写入到新的文件中(避免小文件过多)
# 根据运行时间判定(s),测试环境调小,开发环境30m-1h
a1.sinks.k1.hdfs.rollInterval = 10 
# 根据数据量大小判定(B),128 MB
a1.sinks.k1.hdfs.rollSize = 134217728
# 根据文件的条数判断,为 0 时表示不依据该参数
a1.sinks.k1.hdfs.rollCount = 0# 压缩文件
# 指定文件类型为压缩流
a1.sinks.k1.hdfs.fileType = CompressedStream
# 指定数据压缩格式
a1.sinks.k1.hdfs.codeC = gzip# --------- 配置通道 ---------
# 通道类型
a1.channels.c1.type = file
# 检查点存储路径
a1.channels.c1.checkpointDir = /opt/module/flume-1.10.1/file-channel/checkpoint1
# 用于存储日志文件的目录,多个路径用逗号分隔
a1.channels.c1.dataDirs = /opt/module/flume-1.10.1/file-channel/data1
# 指定允许等待的时间
a1.channels.c1.keep-alive = 6# --------- 组装 ---------
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动 Flume 采集任务

cd $FLUME_HOME./bin/flume-ng agent -c conf/ -f job_file -n a1

参数解析:

  • ./bin/flume-ngflume-ng 是 Flume 的执行脚本,它用于启动 Flume 的 agent 实例。
  1. agent:这告诉 Flume-ng 启动一个代理实例,也就是一个数据采集和传输任务的执行单元。

  2. -c conf/:指定 Flume 配置文件的目录。

  3. -f job_file:指定 Flume 任务配置文件的参数,其中配置文件包含了数据源、通道、接收器以及数据处理的详细信息。

  4. -n a1:指定代理实例的名称,与配置文件中的对应。

Flume 拦截器

当我们在配置文件中定义了动态参数时,例如上方示例中接收器的配置语句:

a1.sinks.k1.hdfs.path = /test/%{tableName}_inc/%Y/%m/%d/

我们设想的是将表名称和年月日进行动态规划,但在未设置拦截器时,这些动态参数值都会被默认为空,如果是系统预定义的参数则为系统设定值。

如下所示:


其中 tableName 是自定义的值,Flume 系统并没有对其进行预定义,所以为空,但 %Y %m %d 这三个值系统默认为当前的日期值,所以不为空。

如果想将上述值设定为希望出现的值,此时便引出了拦截器的概念。通过对拦截器的配置,将采集的数据进行预处理和转换,以满足特定需求。

编写 Flume 拦截器

在 IDEA 中编写拦截器代码,然后打包上传,使用依赖如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.work</groupId><artifactId>intercepted</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- JSON 解析包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><!-- flume 包,不打包该 Jar 包--><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.10.1</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

注意,JDK 版本与平台保持一致。

拦截器实现,用于设定表头与写入日期:

package com.work.flume.interceptor;import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;/*** @author Moon_coder* @version 1.0* @date 2023/10/29 17:32*/
public class TableNameAndTimestamp implements Interceptor {/*** 初始化方法*/@Overridepublic void initialize() {}/**** 处理单条数据* @param event* @return*/@Overridepublic Event intercept(Event event) {try{// 1.获取头数据Map<String, String> headers = event.getHeaders();// 2.获取数据内容,将字节数据转换为字符串String log = new String(event.getBody(), StandardCharsets.UTF_8);// 3.将字符串转换为 JSON 对象JSONObject jsonObject = JSONObject.parseObject(log);// 4.获取表名String table = jsonObject.getString("table");// 5.获取时间,我的数据是经 Maxwell 采集的,Maxwell 中的数据是 10 位时间戳,不含毫秒,将数据存入 HDFS 时 *1000String ts = jsonObject.getString("ts") + "000";// 6.更新头数据信息headers.put("tableName",table);headers.put("timestamp",ts);}catch (JSONException e){// 如果不是 JSON 数据,则将该数据定义为脏数据return null;}return event;}/**** 批量数据处理方法* @param events* @return*/@Overridepublic List<Event> intercept(List<Event> events) {// 批量处理 event 同时实现过滤功能events.removeIf(next -> intercept(next) == null);return events;}/*** 关闭方法*/@Overridepublic void close() {}// TODO 返回拦截器类public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TableNameAndTimestamp();}@Overridepublic void configure(Context context) {}}}

类名或 Jar 包名称都没有特别要求,自定义即可。

注意: 当我们在往头信息里面放东西时,需要与键名一一对应。

            // 6.更新头数据信息headers.put("tableName",table);headers.put("timestamp",ts);

如果是自定义的值,名称与 Flume 配置文件设定的必须对应:


如果是系统预定义的值,则需要在官方网站中查询其对应的键名。例如这里出现的 %Y %m %d 这三个值,在接收器的参数定义那里即可查询到(HDFS Sink¶),如下所示:

在这里插入图片描述

这里官方给出了提示,说 对于所有与时间相关的转义序列,事件的标头中必须存在一个关键字为 “timestamp” 的标头,所以在拦截器中对头信息的时间进行操作时,对应的键名为 timestamp

拦截器应用

将打包好的拦截器 Jar 包上传到 Flume 中的 lib 目录下,然后在 Flume 任务配置文件中添加拦截器配置,如下所示:

# --------- 拦截器 ---------
# 拦截器名称
a1.sources.r1.interceptors = i1
# 编写的拦截器全类名 + $Builder 标识符
a1.sources.r1.interceptors.i1.type = com.work.flume.interceptor.TableNameAndTimestamp$Builder

再次执行上方的示例任务,可以看到配置完拦截器后,头信息已经达到了我们预期的结果。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/124093.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 谈谈你对OOM的认识

文章目录 前言一、基础架构二、常见OOM1、栈内存溢出java.lang.StackOverflowError2、堆内存溢出java.lang.OutOfMemoryError&#xff1a;Java heap space3、GC回收时间过长java.lang.OutOfMemoryError: GC overhead limit exceeded4、NIO程序堆外内存溢出java.lang.OutOfMemor…

STM32———USART串口控制LED灯亮灭

1.硬件设计流程 2.程序设计流程 1.串口初始化时钟使能&#xff1a;RCC_APBxPeriphClockCmd(); GPIO初始化时钟使能&#xff1a;RCC_AHBxPeriphClockCmd();2.GPIO端口模式配置&#xff1a;GPIO_Init();3.串口参数初始化&#xff1a;USART_Init();4.串口使能&#xff1a;USART_C…

SpringBoot相比于Spring的优点(自动配置和依赖管理)

自动配置 例子见真章 我们先看一下我们Spring整合Druid的过程&#xff0c;以及我们使用SpringBoot整合Druid的过程我们就知道我们SpringBoot的好处了。 Spring方式 Spring方式分为两种&#xff0c;第一种就是我们使用xml进行整合&#xff0c;第二种就是使用我们注解进行简化…

RedissonCach的源码流程

上&#xff1a; https://blog.csdn.net/Michelle_Zhong/article/details/126384566 中&#xff1a; https://blog.csdn.net/michelle_zhong/category_11874153.html 下&#xff1a; https://blog.csdn.net/Michelle_Zhong/article/details/126391915?ops_request_misc%257B%…

GE IS420UCSBH1A 控制器模块

控制器模块是工业自动化和控制系统中的关键组件&#xff0c;用于监测、控制和管理各种工程过程。这些模块通常具有以下特点&#xff1a; 多通道控制&#xff1a; 控制器模块通常可以控制多个通道&#xff0c;允许同时管理多个设备或过程。 实时控制&#xff1a; 模块支持实时控…

使用NVIDIA GPU FFmpeg转码 YUV to H264(成功)

0. 官方教程 NVIDIA官方教程&#xff1a;链接&#xff0c;本篇内容主要参考2.2 Software Setup。 1. 安装显卡驱动 确保nvidia-smi能够正常使用&#xff1a; 2. 安装CUDA toolkit 注意要与显卡驱动版本对应&#xff0c;验证toolkit是否正确安装&#xff1a; 3. 安装ffnvco…

代码版本控制工具GitLab :从安装到使用一步到位

一、GitLab 是什么&#xff1f; 如果听说过 Git 或者 GitHub&#xff0c;那么 GitLab 你一定也听说过。GitLab 是一个用于仓库管理系统的开源项目&#xff0c;使用 Git 作为代码管理工具&#xff0c;并在此基础上搭建起来的 Web 服务。简单理解&#xff1a;GitLab 类似私人版 …

计算机毕业设计选题推荐-社区志愿者服务微信小程序/安卓APP-项目实战

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

Transformers实战(二)快速入门文本相似度、检索式对话机器人

Transformers实战&#xff08;二&#xff09;快速入门文本相似度、检索式对话机器人 1、文本相似度 1.1 文本相似度简介 文本匹配是一个较为宽泛的概念&#xff0c;基本上只要涉及到两段文本之间关系的&#xff0c;都可以被看作是一种文本匹配的任务&#xff0c; 只是在具体…

【表面缺陷检测】铝型材表面缺陷检测数据集介绍(含xml标签文件)

一、铝型材介绍 铝型材是一种由铝合金材料制成的&#xff0c;具有固定截面形状和尺寸的条形建材。由于其优良的物理性能和广泛的应用领域&#xff0c;铝型材在现代工业和生活中发挥着重要的作用。 1、铝型材的分类 根据截面形状的不同&#xff0c;铝型材可分为角铝、槽铝、工…

frp内网穿透教程搭建0.52.3版本

网上很多关于frp的教程都是04 03版本的了&#xff0c;都是配置的ini文件&#xff0c;现在都改成toml文件了&#xff0c;下面基本上都是官方文档的简单copy&#xff0c;细节推荐打开去看中文版的文档介绍&#xff08;地址放在最后了&#xff09;。下面简单介绍几个 为什么使用 …

CAN接口的PCB Layout规则要求汇总

随着时代高速发展&#xff0c;控制器局域网&#xff08;CAN&#xff09;接口的应用越来越广泛&#xff0c;尤其是在汽车电子、航空航天等领域中发挥着重要作用&#xff0c;为了确保CAN接口的可靠性和稳定性&#xff0c;工程师必须在其PCB Layout方面下功夫&#xff0c;下面来看…

1496. 判断路径是否相交

1496. 判断路径是否相交 java代码&#xff1a; class Solution {public boolean isPathCrossing(String path) {int x 0;int y 0;HashSet<String> hashSet new HashSet<>();hashSet.add("0-0");for (int i 0; i < path.length(); i) {switch (pa…

气膜场馆里面噪声很大怎么解决?

随着气膜结构在各个领域的广泛应用&#xff0c;人们开始意识到在这些场馆内部&#xff0c;特别是在大型活动和展览中&#xff0c;噪声问题可能会变得相当严重。传统的气膜结构通常难以提供良好的声学环境&#xff0c;这对于参与者的舒适度和活动的质量构成了挑战。为了解决气膜…

内网穿透实现在外远程访问NAS威联通(QNAP)

文章目录 前言1. 威联通安装cpolar内网穿透2. 内网穿透2.1 创建隧道2.2 测试公网远程访问 3. 配置固定二级子域名3.1 保留二级子域名3.2 配置二级子域名 4. 使用固定二级子域名远程访问 前言 购入威联通NAS后&#xff0c;很多用户对于如何在外在公网环境下的远程访问威联通NAS…

人工智能基础_机器学习011_梯度下降概念_梯度下降步骤_函数与导函数求解最优解---人工智能工作笔记0051

然后我们来看一下梯度下降,这里先看一个叫 无约束最优化问题,,值得是从一个问题的所有可能的备选方案中选最优的方案, 我们的知道,我们的正态分布这里,正规的一个正态分布,还有我们的正规方程,他的这个x,是正规的,比如上面画的这个曲线,他的这个x,就是大于0的对吧,而现实生活…

现代挖掘机vr在线互动展示厅是实现业务增长的加速度

VR数字博物馆全景展示充分应用5G、VR全景、web3d开发和三维动画等技术&#xff0c;将实体博物馆整体还原到3D数字空间&#xff0c;让游客360全景漫游式参观&#xff0c;无论大小、贵重、破损的典藏展品都能通过3D建模技术&#xff0c;逼真重现到三维虚拟场景中&#xff0c;让参…

从单模态到多模态,自主AI离我们还有多远?

一、从AI的诞生和发展说起 人工智能的发展&#xff0c;从思想诞生上&#xff0c;可以追逐到十七世纪的帕斯卡和莱布尼茨&#xff0c;1666年&#xff0c;德国博学家戈特弗里德威廉莱布尼茨发表了一篇题为《论组合的艺术》的神秘论文。当时的莱布尼茨只有20岁&#xff0c;他概述了…

python爬虫之正则表达式解析实战

文章目录 1. 图片爬取流程分析2. 实现代码—爬取家常菜图片 1. 图片爬取流程分析 先获取网址&#xff0c;URL&#xff1a;https://www.xiachufang.com/category/40076/ 定位想要爬取的内容使用正则表达式爬取导入模块指定URLUA伪装&#xff08;模拟浏览器&#xff09;发起请求…

SAP SPAD新建打印纸张

SAP SPAD新建打印纸张 1.事务代码SPAD 2.完全管理&#xff0d;设备类型&#xff0d;页格式-显示(创建格式页) 3.按标准A4纸张为模板参考创建。同一个纸张纵向/横向各创建1次(创建格式页) 4.完全管理&#xff0d;设备类型&#xff0d;格式类型-显示(创建格式类型&#xff0…