Idea中flume的Interceptor的编写教程

1.新建-项目-新建项目

注意位置是将来打包文件存放的位置,即我们打包好的文件在这/export/data个目录下寻找

2. 在maven项目中导入依赖

Pom.xml文件中写入

<dependencies>

        <dependency>

            <groupId>org.apache.flume</groupId>

            <artifactId>flume-ng-core</artifactId>

            <version>1.9.0</version>

        </dependency>

    </dependencies>

3.创建包(scr-main-java右键-新建-软件包)

4.创建Java类(右键包名-新建-java类)

5. 继承(implements)flume 的拦截器接口

//键入implements Interceptor{} 光标定位到Interceptor alt + enter键选择导入类导入flume的Interceptor即可 import org.apache.flume.interceptor.Interceptor;

    //此时会报错,点击红色灯泡,选择 实现方法 就会在下文写出需要Override的四个抽象类

6.实现方法

public class MyInterceptor implements Interceptor {@Override//初始化方法public void initialize() {}//单个事件拦截//需求:在event的头部信息中添加标记//提供给channel selector 选择发送给不同的channel@Overridepublic Event intercept(Event event)//Map也需要alt + enter 导入Map<String, String> headers = event.getHeaders();//输入even.getHeaders().var回车即可自行填充等号前面的变量信息String log = new String(event.getBody());//envent.getBody().var自行判断变量类型为byte,为方便使用改为String类型// 键入new String(envent.getBody()).var回车,然后根据需要自行修改变量名//判断log开头的第一个字符,字母则发到channel1,数字则发到channel2char c = log.charAt(0);//log.charAt(0).var回车即可自行填充等号前面的变量信息if(c >= '0' && c <= '9'){headers.put("type","number");}else if ((c >= 'A' && c<= 'Z') || (c >= 'a' && c <= 'z')){// 注意字符串类型要使用>=需要用单引号而不能用双引号headers.put("type","letter");}//因为头部信息属性是一个引用数据类型 直接修改对象即可,也可以不调用以下的set方法   event.setHeaders(headers);//返回eventreturn event;}//批量事件拦截(处理多个event,系统调用这个方法)@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list){intercept(event);}return list;}//重写静态内部类Builder@Overridepublic void close() {}public static class  Builder implements Interceptor.Builder{//创建一个拦截器对象@Overridepublic Interceptor build() {return new MyInterceptor();}//配置方法@Overridepublic void configure(Context context) {}}}

7.打包(idea右侧菜单栏maven-生命周期-package)

打包完成在idea左侧菜单栏 target 中可以看到我们的包

8.将建好的包复制到flume家目录下的lib中即可使用

cp /export/data/flume-interceptor-demo/target/flume-interceptor-demo-1.0-SNAPSHOT.jar $FLUME_HOME/lib

9.测试

 9.1 编辑 flume 配置文件
       vim flume1.conf

# agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = node1

a1.sources.r1.port = 44444

# channel selector: multiplexing 多路复用 ;默认为replicating 复制

a1.sources.r1.selector.type = multiplexing

# 填写相应inerceptor的header上的key

a1.sources.r1.selector.header = type

# 分配不同value发送到的channel,number到c2,letter到 c1

a1.sources.r1.selector.mapping.number = c2

a1.sources.r1.selector.mapping.letter = c1

#如果匹配不上默认选择的channel

a1.sources.r1.selector.default = c2

#interceptor

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = com.ljr.flume.MyInterceptor$Builder

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = node1

a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = node1

a1.sinks.k2.port = 4546

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1 c2

# 接收c1中的数据

a1.sinks.k1.channel = c1

# 接收c2中的数据

a1.sinks.k2.channel = c2

   vim flume2.conf

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = avro

a2.sources.r2.bind = node1

# flume1 中sink的输出端口

a2.sources.r2.port = 4545

# Describe the sink

a2.sinks.k2.type = logger

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

vim flume3.conf

a3.sources = r3

a3.sinks = k3

a3.channels = c3

# Describe/configure the source

a3.sources.r3.type = avro

a3.sources.r3.bind = node1

# flume1 中sink的输出端口

a3.sources.r3.port = 4546

# Describe the sink

a3.sinks.k3.type = logger

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

9.2测试

       打开四个窗口,前三个分别运行flume1.conf、flume2.conf、flume3.conf 配置的进程

第四个窗口启用necat,输入内容进行测试

flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume1.conf -n a1

flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume2.conf -n a2

flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume3.conf -n a3

nc nc node1 44444  (flume1.conf中 source 填的主机名或IP地址 和端口号)

第一个窗口报错 ConnectException: 拒绝连接 可先忽略,运行二、三窗口后即可连接

在窗口4中输入数字、字母、符号

分别在窗口二看到输出字母,窗口三输出数字和符号

恭喜,Interceptor起作用!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/15550.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C#实战】Newtonsoft.Json基类子类解析

情景再现 假设你有如下类&#xff1a; public class Item {public int Id;public string Name; }public class Weapon: Item {public int CurrentAmmo; }public class Inventory {public List<Item> Items; } 其中你序列化的是Inventory类&#xff0c;Items列表里混杂着…

❤Element的使用element

❤Element的使用 1、input输入框 禁止自动补全和下拉提示 input 输入框在输入后浏览器会记录用户输入的值&#xff0c;在下次输入的时候会将记录的值提示在输入框的下面&#xff0c;当输入框是密码的时候&#xff0c;这样可以看见上次输入的密码&#xff0c;这样用户体验不好…

python使用jsonpath来查找key并赋值

目录 一、引言 二、JsonPath简介 三、Python中的JsonPath库 四、使用JsonPath查找JSON Key 五、使用JsonPath赋值JSON Key 六、高级用法 七、结论 一、引言 在数据驱动的现代应用中&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;已成为一种广泛使…

Java 微信小程序登录(openId方式)

1 需求 在开发微信小程序项目时&#xff0c;登录采用的是openId方式&#xff0c;这是一种用户无感的登录方式&#xff0c;用户点开微信小程序时&#xff0c;去调用后端的登录接口。 核心代码 Slf4j Component public class WeChatUtil {private static final String …

基于大数据的支付风险智能防控技术规范

随着大数据、移动互联、人工智能、生物特征识别等技术的快速发展&#xff0c;支付方式正在发生着巨大而深刻的变革&#xff0c;新技术在丰富支付手段、提高支付效率的同时&#xff0c;带来了新的隐患&#xff0c;也对从业机构的风险防控能力提出了更高的要求。 传统的风控技术…

springboot利用Spring的自动装配,策略模式提高系统接口扩展能力,

前段时间有个简单的短信登录功能&#xff0c;需要集成短信服务实现短信发送&#xff0c;一开始用的阿里云&#xff0c;后面领导又说用天翼云&#xff0c;还说阿里云的保留&#xff0c;以后说不准还会集成其他的服务商&#xff0c;于是用nacos的热更新来解决动态切换&#xff0c…

unity InputField的问题

unity 的inputField当focus时&#xff0c;弹起来的键盘样式会携带有一个输入显示。如果想自定义输入框的样式&#xff0c;就需要关闭inputfield的activateInput就是激发输入框的显示然后当inputfield被点击的时候&#xff0c;调用android原生键盘&#xff0c;并设置内容回调。我…

01-02.Vue的常用指令(二)

01-02.Vue的常用指令&#xff08;二&#xff09; 前言v-model&#xff1a;双向数据绑定v-model举例&#xff1a;实现简易计算器Vue中通过属性绑定为元素设置class 类样式引入方式一&#xff1a;数组写法二&#xff1a;在数组中使用三元表达式写法三&#xff1a;在数组中使用 对…

【全部更新完毕】2024电工杯B题详细思路代码成品文章教学:大学生平衡膳食食谱的优化设计及评价

大学生平衡膳食食谱的优化设计及评价 摘要 大学阶段是学生获取知识和身体发育的关键时期&#xff0c;也是形成良好饮食习惯的重要阶段。然而&#xff0c;当前大学生中存在饮食结构不合理和不良饮食习惯的问题&#xff0c;主要表现为不吃早餐或早餐吃得马虎&#xff0c;经常食用…

CentOS 常见命令详解

CentOS 是一种基于 Red Hat Enterprise Linux (RHEL) 的免费开源操作系统&#xff0c;以其稳定性和高效性广泛应用于服务器和企业环境中。对于系统管理员和开发人员来说&#xff0c;掌握 CentOS 的常见命令是日常工作中的必备技能。本文将详细介绍一些在 CentOS 上常用的命令&a…

算法提高之谜一样的牛

算法提高之谜一样的牛 核心思想&#xff1a;树状数组 初始化树状数组为1 表示所有高度都没有用过从后往前遍历h数组 分析身高 取当前h&#xff0c;即前面有h个比它高的 所以它是第h1个数求当前身高中 第h1个数 用二分mid求sum找到第k个数当前身高用过之后减1 #include <i…

高通Android 12/13冻结应用

最近开发SDK遇到冻结应用需求&#xff0c;于是简单记录下。总体而言比较简单&#xff0c;调用系统接口实现此功能。 涉及类与方法 IPackageManager .aidl # * As per {link android.content.pm.PackageManager#setApplicationEnabledSetting}.*/UnsupportedAppUsagevoid setA…

宝塔面板修改端口后无法登入

今天通过宝塔面板登录腾讯云主机&#xff0c;看到下面的提醒&#xff0c;顺便点进去随便改了个端口 本以为改端口是很简单事情&#xff0c;结果我改完之后面板立马登不上了&#xff0c;接下来我改了登录地址和端口也不行&#xff0c;我以为是防火墙的问题&#xff0c;增加了防火…

SpringBoot基于函数替换的热重载

背景 SpringBoot项目每次启动都很慢&#xff0c;有时候调试仅仅是改一点点东西&#xff0c;就要重启工作效率太低&#xff0c;希望能修改完代码&#xff0c;执行快捷键后就把该类的修改生效。&#xff08;仅限于Bean的修改生效&#xff09; 原理 SpringBoot的逻辑基本都是集…

ViT:1 从DETR说起

大模型技术论文不断&#xff0c;每个月总会新增上千篇。本专栏精选论文重点解读&#xff0c;主题还是围绕着行业实践和工程量产。若在某个环节出现卡点&#xff0c;可以回到大模型必备腔调重新阅读。而最新科技&#xff08;Mamba,xLSTM,KAN&#xff09;则提供了大模型领域最新技…

Pycharm在下载安装第三方库时速度慢或超时问题 / 切换国内镜像地址

pycharm下载第三方库速度极慢&#xff0c;搜索了一下&#xff0c;发现方法非常乱&#xff0c;稍作整理。这个问题一般都会出现&#xff0c;在我们开发中遇到的常见问题&#xff0c;根据以下解决方法&#xff0c;基本可以解决&#xff0c;但是不能100%保证 Installing packages …

打造一个增强版Kimi:可以生成图片、PPT、PDF文档、数据分析等

Kimi虽然在国内AI大模型中表现不错&#xff0c;但是和ChatGPT还是差不少功能。现在有一个很简单的方法&#xff0c;把kimi功能增强&#xff0c;使用效果大大改善&#xff0c;比如生成图片&#xff1a; 具体方法如下&#xff1a; 打开coze网站&#xff1a;https://www.coze.cn/…

Elementui里使用el-date-picker来选取指定时间段(时间段不超过31天)

需求描述&#xff1a; 1.禁止选择当前日期之后的所有日期2.选择的时间范围小于等于31天&#xff0c;其他日期禁用<el-date-picker v-model"historySubmitModel.historyDateTime" type"daterange" range-separator"- " start-placeholder"…

鸿蒙应用开发系列 篇四:鸿蒙系统应用开发基础

文章目录 系列文章概述ArkTS应用(Stage模型)示例应用示例代码使用模拟器运行应用使用真机运行应用应用程序包共享包应用配置文件(Stage模型)资源目录示例系列文章 鸿蒙应用开发系列 篇一:鸿蒙系统概述 鸿蒙应用开发系列 篇二:鸿蒙系统开发工具与环境

【c++基础】昆虫繁殖

说明 科学家在热带森林中发现了一种特殊的昆虫&#xff0c;这种昆虫的繁殖能力很强。每对成虫每过x个月产y对卵&#xff0c;每对卵要过两个月长成成虫。假设每个成虫不死&#xff0c;第一个月只有一对成虫&#xff0c;且卵长成成虫后的第一个月不产卵(过X个月产卵)&#xff0c…