Flume 拦截器概念及自定义拦截器的运用

文章目录

    • Flume 拦截器
    • 拦截器的作用
    • 拦截器运用
      • 1.创建项目
      • 2.实现拦截器接口
      • 3.编写事件处理逻辑
      • 4.拦截器构建
      • 5.打包与上传
      • 6.编写配置文件
      • 7.测试运行

Flume 拦截器

在 Flume 中,拦截器(Interceptors)是一种可以在事件传输过程中拦截、处理和修改事件的组件。

位于 Source 与 Channel 之间,在写入Channel 之前,拦截器可以对数据进行转换、提取或删除,以满足特定的需求。每个拦截器只处理同一个 Source 接收到的事件,你也可以同时配置多个拦截器,它们会按顺序执行。

拦截器的作用

  • 数据处理和转换: 拦截器可以对事件数据进行处理和转换。例如,可以对原始日志进行解析、过滤、格式化等操作,以便后续处理或存储。

  • 数据增强: 拦截器可以为事件数据添加额外的信息或元数据。例如,可以添加时间戳、主机信息、标签等,以丰富事件数据的内容。

  • 数据过滤: 拦截器可以根据特定条件过滤掉不需要的事件数据,减少数据传输的量或过滤掉无效数据。

  • 监控和日志: 拦截器可以用于监控数据流的运行情况,记录日志信息或统计数据流中的事件数量、处理速率等指标,帮助用户进行性能分析和故障排查。

拦截器运用

1.创建项目

创建一个 Maven 工程项目,引入 Flume 依赖。

在 IDEA 中创建 Maven 项目想必大家都会,这里不再赘述。

根据集群中的 Flume 版本,引入 Flume 依赖,如下所示:

    <dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.10.1</version><scope>provided</scope></dependency></dependencies>

无需将该依赖打包进最后的 JAR 包中,故将其作用域设置为 provided

当一个依赖项的 scope 被设置为 compile 时,它将在编译和运行时都可用,并包含在最终的项目包中。而 provided 范围的依赖项仅在编译和测试阶段需要,运行时不包括。

2.实现拦截器接口

创建测试类 TestInterceptor 实现拦截器 Interceptor,注意,导包时不要导错了。

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.List;public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {return null;}@Overridepublic List<Event> intercept(List<Event> list) {return null;}@Overridepublic void close() {}
}

在上面的代码中,我们实现了 Flume 拦截器接口 Interceptor,并重写了其中的四个方法:

  • initialize() 方法:初始化拦截器操作,读取配置信息、建立连接等。

  • intercept(Event event) 方法:用于拦截单个事件,并对事件进行处理。接收一个事件对象作为输入,并返回一个修改后的事件对象。

  • intercept(List<Event> list) 方法:事件批处理,拦截事件列表,并对事件列表进行处理。

  • close() 方法:关闭拦截器,在这里释放资源、关闭连接等。

3.编写事件处理逻辑

在这里做个简单的事件处理,如果数据中包含字符串 hello 则进行过滤操作,这样我们可以直观感受到拦截器的存在,下面来进行逻辑设计。

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;public class TestInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 获取事件数据String eventData = new String(event.getBody(), StandardCharsets.UTF_8);// 检查事件数据中是否包含指定字符串if (eventData.contains("hello")) {// 如果包含指定字符串,则过滤掉该事件,返回 nullreturn null;}return event;}@Overridepublic List<Event> intercept(List<Event> events) {// 创建一个新的列表,存储处理过后的事件List<Event> interceptedEvents = new ArrayList<>();for (Event event : events) {Event interceptedEvent = intercept(event);if (interceptedEvent != null) {interceptedEvents.add(interceptedEvent);}}return interceptedEvents;}@Overridepublic void close() {}}

intercept(List<Event> events) 方法用于对事件列表进行批量处理。这个方法会遍历传入的事件列表,并对每一个事件调用 intercept(Event event) 方法来进行单独处理。

注意,如果只有 intercept(Event event) 方法被重写了,而没有实现 intercept(List<Event> events) 批处理方法,那么在处理事件时会以单个事件的方式进行处理。

在不需要进行初始化和释放资源的情况下,我们可以选择不重写 initializeclose 方法。

4.拦截器构建

在编写完事件处理逻辑后,我们还需要对拦截器进行构建。

在 Flume 中,拦截器的创建和配置通常是通过 Builder 模式来完成的。

在程序中,我们可以定义一个静态内部类 Builder,实现 Interceptor.Builder 接口来对拦截器进行构建,如下所示:

    public static class Builder implements Interceptor.Builder {@Overridepublic void configure(Context context) {// 配置操作,可留空}@Overridepublic Interceptor build() {// 返回构建的拦截器类}}

在我们这个案例中,完整的代码如下所示:

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;public class TestInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 获取事件数据String eventData = new String(event.getBody(), StandardCharsets.UTF_8);// 检查事件数据中是否包含指定字符串if (eventData.contains("hello")) {// 如果包含指定字符串,则过滤掉该事件,返回 nullreturn null;}return event;}@Overridepublic List<Event> intercept(List<Event> events) {List<Event> interceptedEvents = new ArrayList<>();for (Event event : events) {Event interceptedEvent = intercept(event);if (interceptedEvent != null) {interceptedEvents.add(interceptedEvent);}}return interceptedEvents;}@Overridepublic void close() {}// 拦截器构建public static class Builder implements Interceptor.Builder {@Overridepublic void configure(Context context) {}@Overridepublic Interceptor build() {return new TestInterceptor();}}}

5.打包与上传

将写好的项目进行打包,并上传到集群中,进行测试。

注意,需要将打包好的拦截器包放在 Flume 安装目录下的 lib 文件夹中。

在这里插入图片描述

6.编写配置文件

这里为了验证拦截器的作用,通过一个 Flume 采集案例来进行体现。

如果你不知道如何编写配置文件,可以看我写的这篇文章 —— Flume 配置文件编写技巧(包会的,抄就完了)

这个配置案例是将发送到 HTTP 源中的数据采集到 HDFS 上,将本地文件作为缓冲通道,该配置文件命名为 httpToHDFS.conf

# 声明
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Source 源配置
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.bind = localhost# 拦截器配置
# 拦截器定义
a1.sources.r1.interceptors = i1
# 拦截器全类名
a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder# Sink 处理/存储配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip# Channel 通道配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/data# 组装/绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

拦截器全类名配置那里需要注意,格式为 拦截器的全类名 + $Builder

在 IDEA 中获取全类名的方式:右击需要引用的类,依次选择【File——>Copy Path/Reference…——>Copy Reference】即可复制。

你可以根据你的需要对该配置文件进行修改。

7.测试运行

因为我们是将数据采集到 HDFS 上,所以需要先启动 Hadoop,然后再进行操作。

# 运行 Flume
cd $FLUME_HOME# 注意引用路径,需要修改成你自己的
./bin/flume-ng agent -n a1 -c conf/ -f job/httpToHDFS.conf -Dflume.root.logger=INFO,console

Flume 启动完成后,如下所示:

在这里插入图片描述

我们通过其它窗口,使用 curl 命令向 HTTP 源发送两条模拟数据:

curl -X POST -d '[{"body":"hello body"}]'  http://localhost:5140curl -X POST -d '[{"body":"HELLO FLUME"}]'  http://localhost:5140

在这里插入图片描述

数据发送完成后,Flume 会采集到该数据,并存储到 HDFS 上。

在这里插入图片描述

通过命令,查看 HDFS 中存储的内容,验证拦截器是否生效:

hdfs dfs -text /flume/events/2024-04-04/1630/00/ev* 

结果如下所示:

在这里插入图片描述

可以看到,我们在上面分别发送了两条数据 hello bodyHELLO FLUME,但最终 HDFS 中只存储了一条数据。

这是因为我们设置的拦截器生效了,它对数据中包含 hello 字符串的事件进行了过滤,故只存储了一条数据。

Flume 拦截器就是起到这样的效果,对数据进行处理、转换、删除等操作,是不是很简单呀。(同学,包会的呀)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/792241.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FreeRtos入门-4 事件组与同步点

事件组 事件组 同步点 创建 xEventGroupCalc xEventGroupCreate();//1&#xff0c;创建事件组 xEventGroupSyc xEventGroupCreate() 设置 xEventGroupSetBits(xEventGroupCalc,(1<<0));//设置事件组bit0 位 xEventGroupSync(xEventGroupSyc,BUSYING,ALL,portMAX…

VB 通过COM接口解析PSD文件

最近有PS测评的需求&#xff0c;故而想到了解析psd文件&#xff0c;目的就是为了获取文档信息和图层信息&#xff1b;获取PS的图像信息有很多方式&#xff0c;有过程性的&#xff0c;比如监听PS的各种操作事件&#xff1b;有结果性的&#xff0c;比如本文写的解析PSD文件。 0.…

使用pip安装geopandas(24.4更新)

geopandas是我们用Python进行地理分析常用的库&#xff0c;在数据处理、分析、制图等场景中有着极为广泛的应用&#xff0c;但是在安装过程中会出现各种问题。​geopandas的安装方式有很多&#xff0c;今天我们选取较为简单的pip来进行geopandas的安装。 ​首先&#xff0c;我…

内部类(InnerClass)

概述 什么是内部类 将一个类A定义在另一个类B里面&#xff0c;里面的那个类A就称为内部类&#xff08;InnerClass&#xff09;&#xff0c;类B则称为外部类&#xff08;OuterClass&#xff09;。 为什么要声明内部类呢 具体来说&#xff0c;当一个事物A的内部&#xff0c;还有…

Java web第一次作业

1.学会用记事本编写jsp文件&#xff0c;并放进tomcat的相关目录下&#xff0c;运行。 源代码&#xff1a; <% page contentType"text/html;charsetUTF-8" language"java" %> <html> <head> <title>我的第一个JSP页面</ti…

JavaSE——运算符

1. 概念 运算符是一种用于执行特定操作的符号或关键字。在编程中&#xff0c;运算符用于对变量、常量和表达式进行操作&#xff0c;以产生一个结果。 作为一门计算机语言&#xff0c; Java 也提供了一套丰富的运算符来操纵变量。 Java 中运算符可分为以下&#xff1a;算术运算…

电商系列之促销

> 插&#xff1a;AI时代&#xff0c;程序员或多或少要了解些人工智能&#xff0c;前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 坚持不懈&#xff0c;越努力越幸运&#xff0c;大家…

算法沉淀——动态规划篇(子数组系列问题(下))

算法沉淀——动态规划篇&#xff08;子数组系列问题&#xff08;下&#xff09;&#xff09; 前言一、等差数列划分二、最长湍流子数组三、单词拆分四、环绕字符串中唯一的子字符串 前言 几乎所有的动态规划问题大致可分为以下5个步骤&#xff0c;后续所有问题分析都将基于此 …

CSS之第一个CSS样式和CSS选择符

前端这些博客&#xff0c;我觉得都是固定的语法&#xff0c;故而不会以过多的文字进行描述&#xff0c;本系列博文均以实例和代码介绍的方式进行&#xff0c;主要按照代码进行。不会以过多的文字描述。 第一个CSS样式 <!DOCTYPE html> <html lang"en">…

【JavaEE初阶系列】——文件操作 IO 之 文件系统操作

目录 &#x1f4dd;认识文件 &#x1f6a9;树型结构组织 和 目录 &#x1f388;绝对路径和相对路径 &#x1f6a9;文件类型 &#x1f4dd;文件系统操作 &#x1f388;File 概述 &#x1f388;File类的使用 1. 绝对路径 vs 相对路径 2. 路径分隔符 3. 静态成员变量 4…

【C语言】翻译环境与运行环境

一、前言 在我们学习C语言的时候&#xff0c;第一个接触的程序就是&#xff1a;在屏幕上打印” hello word! “&#xff0c;可当时的我们却未去深入的理解与感悟&#xff0c;一个程序代码是如何运行的&#xff1b;而这一期的博客&#xff0c;则是带着我们&#xff0c;通过C代码…

mac电脑安装redis教程

1、下载地址 Download | RedisRedisYou can download the last Redis source files here. For additional options, see the Redis downloads section below.Stable (7.2)Redis 7.2 …https://redis.io/download/#redis-downloads 2、安装 2.1 解压下载后的压缩文件 2.2 进入…

Vulnhub:WESTWILD: 1.1

目录 信息收集 arp nmap nikto whatweb WEB web信息收集 dirmap enm4ulinux sumbclient get flag1 ssh登录 提权 横向移动 get root 信息收集 arp ┌──(root㉿ru)-[~/kali/vulnhub] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 0…

LeetCode-236. 二叉树的最近公共祖先【树 深度优先搜索 二叉树】

LeetCode-236. 二叉树的最近公共祖先【树 深度优先搜索 二叉树】 题目描述&#xff1a;解题思路一&#xff1a;递归判断解题思路二&#xff1a;0解题思路三&#xff1a;0 题目描述&#xff1a; 给定一个二叉树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖…

linux操作系统的进程状态

这个博客只是为了自己复习用的&#xff01;&#xff01;&#xff01; 冯诺依曼体系结构 计算机是由一个一个硬件组成的 输入设备&#xff1a;键盘&#xff0c;鼠标&#xff0c;扫描仪&#xff0c;写板等等 中央处理器&#xff08;CPU&#xff09;:含有运算器和控制器等 输出单…

【算法练习】27:冒泡排序学习笔记

一、冒泡排序的算法思想 原理&#xff1a;以升序为例&#xff0c;冒泡排序通过从左往右连续比较相邻元素&#xff0c;当发现左边比右边大就交换元素。从左往右依次比较完称为“一轮”&#xff0c;每轮结束之后就会固定一个元素。 时间复杂度&#xff1a;2层循环&#xff0c;所以…

不讲概念,讲实操,mysql 分表模糊查询、分页查询 及 merge 表的使用

1.Mysql merge合并表的要求 1.合并的分表必须是 MyISAM 引擎&#xff0c;MyISAN引擎是不支持事务的。2.Merge表只保证合表后数据唯一性&#xff0c;合表前的数据可能会存在重复。3.表的结构必须一致&#xff0c;包括索引、字段类型、引擎和字符集。4.删除 tb_member1 分表正确…

Python实现BOA蝴蝶优化算法优化卷积神经网络分类模型(CNN分类算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 蝴蝶优化算法(butterfly optimization algorithm, BOA)是Arora 等人于2019年提出的一种元启发式智能算…

[技术闲聊]我对电路设计的理解(三)

终于可以独立做项目了&#xff0c;是不是很激动&#xff0c;是不是为自己骄傲和自豪&#xff0c;应该的&#xff0c;奋斗那么久不就是为了站在山巅看看四周的风景嘛&#xff01; 虽说山外还有山&#xff0c;但是此刻就在脚下的山巅上&#xff0c;怡然自得都是不过分的&#xff…

LLM端侧部署系列 | 如何将阿里千问大模型Qwen部署到手机上?实战演示(下篇)

引言 简介 编译Android可用的模型 转换权重 生成配置文件 模型编译 编译apk 修改配置文件 绑定android library 配置gradle 编译apk 手机上运行 安装 APK 植入模型 效果实测 0. 引言 清明时节雨纷纷&#xff0c;路上行人欲断魂。 小伙伴们好&#xff0c;我是《小…