1.新建-项目-新建项目
注意位置是将来打包文件存放的位置,即我们打包好的文件在这/export/data个目录下寻找
2. 在maven项目中导入依赖
Pom.xml文件中写入
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
3.创建包(scr-main-java右键-新建-软件包)
4.创建Java类(右键包名-新建-java类)
5. 继承(implements)flume 的拦截器接口
//键入implements Interceptor{} 光标定位到Interceptor alt + enter键选择导入类导入flume的Interceptor即可 import org.apache.flume.interceptor.Interceptor;
//此时会报错,点击红色灯泡,选择 实现方法 就会在下文写出需要Override的四个抽象类
6.实现方法
public class MyInterceptor implements Interceptor {@Override//初始化方法public void initialize() {}//单个事件拦截//需求:在event的头部信息中添加标记//提供给channel selector 选择发送给不同的channel@Overridepublic Event intercept(Event event)//Map也需要alt + enter 导入Map<String, String> headers = event.getHeaders();//输入even.getHeaders().var回车即可自行填充等号前面的变量信息String log = new String(event.getBody());//envent.getBody().var自行判断变量类型为byte,为方便使用改为String类型// 键入new String(envent.getBody()).var回车,然后根据需要自行修改变量名//判断log开头的第一个字符,字母则发到channel1,数字则发到channel2char c = log.charAt(0);//log.charAt(0).var回车即可自行填充等号前面的变量信息if(c >= '0' && c <= '9'){headers.put("type","number");}else if ((c >= 'A' && c<= 'Z') || (c >= 'a' && c <= 'z')){// 注意字符串类型要使用>=需要用单引号而不能用双引号headers.put("type","letter");}//因为头部信息属性是一个引用数据类型 直接修改对象即可,也可以不调用以下的set方法 event.setHeaders(headers);//返回eventreturn event;}//批量事件拦截(处理多个event,系统调用这个方法)@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list){intercept(event);}return list;}//重写静态内部类Builder@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{//创建一个拦截器对象@Overridepublic Interceptor build() {return new MyInterceptor();}//配置方法@Overridepublic void configure(Context context) {}}}
7.打包(idea右侧菜单栏maven-生命周期-package)
打包完成在idea左侧菜单栏 target 中可以看到我们的包
8.将建好的包复制到flume家目录下的lib中即可使用
cp /export/data/flume-interceptor-demo/target/flume-interceptor-demo-1.0-SNAPSHOT.jar $FLUME_HOME/lib
9.测试
9.1 编辑 flume 配置文件
vim flume1.conf
# agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = node1 a1.sources.r1.port = 44444 # channel selector: multiplexing 多路复用 ;默认为replicating 复制 a1.sources.r1.selector.type = multiplexing # 填写相应inerceptor的header上的key a1.sources.r1.selector.header = type # 分配不同value发送到的channel,number到c2,letter到 c1 a1.sources.r1.selector.mapping.number = c2 a1.sources.r1.selector.mapping.letter = c1 #如果匹配不上默认选择的channel a1.sources.r1.selector.default = c2 #interceptor a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.ljr.flume.MyInterceptor$Builder # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = node1 a1.sinks.k1.port = 4545 a1.sinks.k2.type = avro a1.sinks.k2.hostname = node1 a1.sinks.k2.port = 4546 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 # 接收c1中的数据 a1.sinks.k1.channel = c1 # 接收c2中的数据 a1.sinks.k2.channel = c2 |
vim flume2.conf
a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = avro a2.sources.r2.bind = node1 # flume1 中sink的输出端口 a2.sources.r2.port = 4545 # Describe the sink a2.sinks.k2.type = logger # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 |
vim flume3.conf
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = avro a3.sources.r3.bind = node1 # flume1 中sink的输出端口 a3.sources.r3.port = 4546 # Describe the sink a3.sinks.k3.type = logger # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3 |
9.2测试
打开四个窗口,前三个分别运行flume1.conf、flume2.conf、flume3.conf 配置的进程
第四个窗口启用necat,输入内容进行测试
flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume1.conf -n a1
flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume2.conf -n a2
flume-ng agent -c conf/ -f /export/server/flume/job/group2-multiplexing-test/flume3.conf -n a3
nc nc node1 44444 (flume1.conf中 source 填的主机名或IP地址 和端口号)
第一个窗口报错 ConnectException: 拒绝连接 可先忽略,运行二、三窗口后即可连接
在窗口4中输入数字、字母、符号
分别在窗口二看到输出字母,窗口三输出数字和符号
恭喜,Interceptor起作用!