Flume conf文件编写
vim file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /Users/zhangjin/model/project/realtime-flink/applog/log/app.*
# 设置断点续传的位置
a1.sources.r1.positionFile = /Users/zhangjin/model/flume/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = localhost:9092
a1.channels.c1.kafka.topic = topic_log
# 设置不以Flume event 写入数据,以Body数据进行写入
a1.channels.c1.parseAsFlumeEvent = false#组装
a1.sources.r1.channels = c1
Flume ETLInterceptor拦截器的编写
maven依赖
<dependencies><!--Flume依赖 --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><!--Json格式校验--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies>
maven package打包依赖
<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
判断是否是JSON字符串
public class JSONUtil {/** 通过异常判断是否是json字符串* 是:返回true 不是:返回false* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}
}
拦截器实现
- 继承Interceptor接口
- 实现单event处理
- 实现批量event处理
- 重写builder方法
public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}/*** 单个event处理* 检验是否是Json格式* @param event* @return*/@Overridepublic Event intercept(Event event) {//1 获取json数据byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2 校验json数据if (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}/*** 多个event处理* @param list* @return*/@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();}}return list;}@Overridepublic void close() {}/*** 拦截器重写Builder方法*/public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}
测试
maven package打包,将生成的jar包放在了Flume的lib目录下
启动kafka
# 启动命令
./bin/kafka-server-start.sh -daemon ./config/server.properties &# 开启消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_log
启动Flume
bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console