Idea中flume的Interceptor的编写教程

1.新建-项目-新建项目

注意位置是将来打包文件存放的位置,即我们打包好的文件在这/export/data个目录下寻找

2. 在maven项目中导入依赖

Pom.xml文件中写入

<dependencies>

        <dependency>

            <groupId>org.apache.flume</groupId>

            <artifactId>flume-ng-core</artifactId>

            <version>1.9.0</version>

        </dependency>

    </dependencies>

3.创建包(scr-main-java右键-新建-软件包)

4.创建Java类(右键包名-新建-java类)

5. 继承(implements)flume 的拦截器接口

//键入implements Interceptor{} 光标定位到Interceptor alt + enter键选择导入类导入flume的Interceptor即可 import org.apache.flume.interceptor.Interceptor;

    //此时会报错,点击红色灯泡,选择 实现方法 就会在下文写出需要Override的四个抽象类

6.实现方法

public class MyInterceptor implements Interceptor {@Override//初始化方法public void initialize() {}//单个事件拦截//需求:在event的头部信息中添加标记//提供给channel selector 选择发送给不同的channel@Overridepublic Event intercept(Event event)//Map也需要alt + enter 导入Map<String, String> headers = event.getHeaders();//输入even.getHeaders().var回车即可自行填充等号前面的变量信息String log = new String(event.getBody());//envent.getBody().var自行判断变量类型为byte,为方便使用改为String类型// 键入new String(envent.getBody()).var回车,然后根据需要自行修改变量名//判断log开头的第一个字符,字母则发到channel1,数字则发到channel2char c = log.charAt(0);//log.charAt(0).var回车即可自行填充等号前面的变量信息if(c >= '0' && c <= '9'){headers.put("type","number");}else if ((c >= 'A' && c<= 'Z') || (c >= 'a' && c <= 'z')){// 注意字符串类型要使用>=需要用单引号而不能用双引号headers.put("type","letter");}//因为头部信息属性是一个引用数据类型 直接修改对象即可,也可以不调用以下的set方法   event.setHeaders(headers);//返回eventreturn event;}//批量事件拦截(处理多个event,系统调用这个方法)@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list){intercept(event);}return list;}//重写静态内部类Builder@Overridepublic void close() {}public static class  Builder implements Interceptor.Builder{//创建一个拦截器对象@Overridepublic Interceptor build() {return new MyInterceptor();}//配置方法@Overridepublic void configure(Context context) {}}}

7.打包(idea右侧菜单栏maven-生命周期-package)

打包完成在idea左侧菜单栏 target 中可以看到我们的包

8.将建好的包复制到flume家目录下的lib中即可使用

cp /export/data/flume-interceptor-demo/target/flume-interceptor-demo-1.0-SNAPSHOT.jar $FLUME_HOME/lib

9.测试

 9.1 编辑 flume 配置文件
       vim flume1.conf

# agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = node1

a1.sources.r1.port = 44444

# channel selector: multiplexing 多路复用 ;默认为replicating 复制

a1.sources.r1.selector.type = multiplexing

# 填写相应inerceptor的header上的key

a1.sources.r1.selector.header = type

# 分配不同value发送到的channel,number到c2,letter到 c1

a1.sources.r1.selector.mapping.number = c2

a1.sources.r1.selector.mapping.letter = c1

#如果匹配不上默认选择的channel

a1.sources.r1.selector.default = c2

#interceptor

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = com.ljr.flume.MyInterceptor$Builder

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = node1

a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = node1

a1.sinks.k2.port = 4546

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1 c2

# 接收c1中的数据

a1.sinks.k1.channel = c1

# 接收c2中的数据

a1.sinks.k2.channel = c2

   vim flume2.conf

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = avro

a2.sources.r2.bind = node1

# flume1 中sink的输出端口

a2.sources.r2.port = 4545

# Describe the sink

a2.sinks.k2.type = logger

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

vim flume3.conf

a3.sources = r3

a3.sinks = k3

a3.channels = c3

# Describe/configure the source

a3.sources.r3.type = avro

a3.sources.r3.bind = node1

# flume1 中sink的输出端口

a3.sources.r3.port = 4546

# Describe the sink

a3.sinks.k3.type = logger

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

9.2测试

       打开四个窗口,前三个分别运行flume1.conf、flume2.conf、flume3.conf 配置的进程

第四个窗口启用necat,输入内容进行测试

flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume1.conf -n a1

flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume2.conf -n a2

flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume3.conf -n a3

nc nc node1 44444  (flume1.conf中 source 填的主机名或IP地址 和端口号)

第一个窗口报错 ConnectException: 拒绝连接 可先忽略,运行二、三窗口后即可连接

在窗口4中输入数字、字母、符号

分别在窗口二看到输出字母,窗口三输出数字和符号

恭喜,Interceptor起作用!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/15550.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

❤Element的使用element

❤Element的使用 1、input输入框 禁止自动补全和下拉提示 input 输入框在输入后浏览器会记录用户输入的值&#xff0c;在下次输入的时候会将记录的值提示在输入框的下面&#xff0c;当输入框是密码的时候&#xff0c;这样可以看见上次输入的密码&#xff0c;这样用户体验不好…

python使用jsonpath来查找key并赋值

目录 一、引言 二、JsonPath简介 三、Python中的JsonPath库 四、使用JsonPath查找JSON Key 五、使用JsonPath赋值JSON Key 六、高级用法 七、结论 一、引言 在数据驱动的现代应用中&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;已成为一种广泛使…

基于大数据的支付风险智能防控技术规范

随着大数据、移动互联、人工智能、生物特征识别等技术的快速发展&#xff0c;支付方式正在发生着巨大而深刻的变革&#xff0c;新技术在丰富支付手段、提高支付效率的同时&#xff0c;带来了新的隐患&#xff0c;也对从业机构的风险防控能力提出了更高的要求。 传统的风控技术…

01-02.Vue的常用指令(二)

01-02.Vue的常用指令&#xff08;二&#xff09; 前言v-model&#xff1a;双向数据绑定v-model举例&#xff1a;实现简易计算器Vue中通过属性绑定为元素设置class 类样式引入方式一&#xff1a;数组写法二&#xff1a;在数组中使用三元表达式写法三&#xff1a;在数组中使用 对…

【全部更新完毕】2024电工杯B题详细思路代码成品文章教学:大学生平衡膳食食谱的优化设计及评价

大学生平衡膳食食谱的优化设计及评价 摘要 大学阶段是学生获取知识和身体发育的关键时期&#xff0c;也是形成良好饮食习惯的重要阶段。然而&#xff0c;当前大学生中存在饮食结构不合理和不良饮食习惯的问题&#xff0c;主要表现为不吃早餐或早餐吃得马虎&#xff0c;经常食用…

宝塔面板修改端口后无法登入

今天通过宝塔面板登录腾讯云主机&#xff0c;看到下面的提醒&#xff0c;顺便点进去随便改了个端口 本以为改端口是很简单事情&#xff0c;结果我改完之后面板立马登不上了&#xff0c;接下来我改了登录地址和端口也不行&#xff0c;我以为是防火墙的问题&#xff0c;增加了防火…

ViT:1 从DETR说起

大模型技术论文不断&#xff0c;每个月总会新增上千篇。本专栏精选论文重点解读&#xff0c;主题还是围绕着行业实践和工程量产。若在某个环节出现卡点&#xff0c;可以回到大模型必备腔调重新阅读。而最新科技&#xff08;Mamba,xLSTM,KAN&#xff09;则提供了大模型领域最新技…

Pycharm在下载安装第三方库时速度慢或超时问题 / 切换国内镜像地址

pycharm下载第三方库速度极慢&#xff0c;搜索了一下&#xff0c;发现方法非常乱&#xff0c;稍作整理。这个问题一般都会出现&#xff0c;在我们开发中遇到的常见问题&#xff0c;根据以下解决方法&#xff0c;基本可以解决&#xff0c;但是不能100%保证 Installing packages …

打造一个增强版Kimi:可以生成图片、PPT、PDF文档、数据分析等

Kimi虽然在国内AI大模型中表现不错&#xff0c;但是和ChatGPT还是差不少功能。现在有一个很简单的方法&#xff0c;把kimi功能增强&#xff0c;使用效果大大改善&#xff0c;比如生成图片&#xff1a; 具体方法如下&#xff1a; 打开coze网站&#xff1a;https://www.coze.cn/…

C++容器之位集(std::bitset)

目录 1 概述2 使用实例3 接口使用3.1 constructor3.2 count_and_size3.3 test3.4 any3.5 none3.6 all3.7 set3.8 reset3.9 filp3.10 to_string3.11 to_ulong3.12 to_ullong3.13 operators1 概述 位集存储位(只有两个可能值的元素:0或1,true或false,…)。   该类模拟bool…

推荐一款自助分析的财务分析软件:奥威BI软件

奥威BI软件是一款支持多维度动态自助分析的软件&#xff0c;预设了智能财务分析方案&#xff0c;提供内存行列计算模型解决财务指标计算难题&#xff0c;界面简洁&#xff0c;以点击、拖曳操作为主&#xff0c;十分适合没有IT背景的财务人做财务分析。因此也经常有人说奥威BI软…

Spark搭建 Standalone模式详细步骤

Standalone模式概述&#xff1a; Standalone模式是Spark自带的一种集群模式&#xff08;本地集群&#xff0c;不依赖与外部集群&#xff0c;比如Yarn&#xff09;&#xff0c;可以真实地在多个机器之间搭建Spark集群的环境。 Standalone是完整的Spark运行环境,其中: Master角…

OpenFeign微服务调用组件使用

前言&#xff1a;OpenFeign是可以跨服务、跨进程的调用方式。 什么是Feign Feign是Netflix开发的声明式、模版化的HTTP客户端。 优势: Feign可以做到使用 HTTP 请求远程服务时就像调用本地方法一样的体验&#xff0c;开发者完全感知不到这是远程方法&#xff0c;更感知不到这…

【TB作品】stm32单片机读取DS2401程序

DS2401是由Analog Devices公司生产的一种硅序列号芯片&#xff0c;它提供了一个绝对唯一的64位ROM识别码&#xff0c;用于确保可追溯性。以下是对DS2401器件的分析&#xff1a; 特点和优势&#xff1a; 唯一性&#xff1a;每个DS2401芯片都有一个独一无二的64位注册码&#x…

[less配置]vue2引入less

1、终端输入&#xff1a;npm install less less-loader --save-dev 2、在package.json查看是否安装less依赖 3、调用

vue2快速安装环境,从0-1创建vue2项目教程

vue2快速安装环境&#xff0c;从0-1创建vue2项目教程(windows) 一、node下载 1.如何查看node版本和npm版本 二、npm安装脚手架 1.注意事项 三、vue2选项解读 四、运行脚手架 一、node下载 1、(node.js中文网) 下载长期稳定版本就行 解释下node.js和npm的关系? 想象你在…

原始字面常量(C++11)

原始字面常量&#xff08;C11&#xff09; 文章目录 原始字面常量&#xff08;C11&#xff09;前言一、原始字面量二、代码示例总结 前言 字面量一般是指数值&#xff08;12、454等&#xff09;和字符串&#xff08;“Hw”、“h\t”&#xff09;&#xff0c;但是有时候我们想表…

PyTorch安装与配置

前言 参考文档&#xff1a;https://github.com/TingsongYu/PyTorch-Tutorial-2nd 环境配置之Anaconda 解释器——python.exe&#xff0c;是人类与CPU之间的桥梁&#xff0c;需要配置系统环境变量 Anaconda&#xff1a;集成环境&#xff0c;包管理器 Conda 安装 Anaconda&am…

WXSS模板样式-全局样式和局部样式

一、WXSS 1.WXSS WXSS(WeiXin Style Sheets)是一套样式语言&#xff0c;用于美化WXML的组件样式&#xff0c;类似于网页开发中的CSS 2.WXSS和CSS的关系 WXSS具有CSS大部分特性&#xff0c;同时&#xff0c;WXSS还对CSS进行了扩充以及修改&#xff0c;以适应微信小程序的开发…

CSDN 自动评论互动脚本

声明 该脚本的目的只是为了提升博客创作效率和博主互动效率&#xff0c;希望大家还是要尊重各位博主的劳动成果。 数据库设计 尽量我们要新建一个数据库csdn_article&#xff0c;再在其中建一个数据表article -- csdn_article-- article-- 需要进行自动评论的表格信息...CR…