Flume拦截器的实现

Flume conf文件编写

vim file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /Users/zhangjin/model/project/realtime-flink/applog/log/app.*
# 设置断点续传的位置
a1.sources.r1.positionFile = /Users/zhangjin/model/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = localhost:9092
a1.channels.c1.kafka.topic = topic_log
# 设置不以Flume event 写入数据,以Body数据进行写入
a1.channels.c1.parseAsFlumeEvent = false#组装
a1.sources.r1.channels = c1

Flume ETLInterceptor拦截器的编写

maven依赖

	<dependencies><!--Flume依赖 --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><!--Json格式校验--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies>

maven package打包依赖

    <build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

判断是否是JSON字符串

public class JSONUtil {/** 通过异常判断是否是json字符串* 是:返回true  不是:返回false* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}
}

拦截器实现

  1. 继承Interceptor接口
  2. 实现单event处理
  3. 实现批量event处理
  4. 重写builder方法
public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}/*** 单个event处理* 检验是否是Json格式* @param event* @return*/@Overridepublic Event intercept(Event event) {//1 获取json数据byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2 校验json数据if (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}/*** 多个event处理* @param list* @return*/@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();}}return list;}@Overridepublic void close() {}/*** 拦截器重写Builder方法*/public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}

测试

maven package打包,将生成的jar包放在了Flume的lib目录下
启动kafka

# 启动命令
./bin/kafka-server-start.sh -daemon ./config/server.properties &# 开启消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_log

启动Flume

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/66133.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025-01-04 Unity插件 YodaSheet2 —— 基础用法

文章目录 环境配置1 创建 YadeSheetData2 读取方式2.1 表格读取2.2 列表读取 3 自定义设置3.1 修改代码生成位置3.2 添加列表支持3.2.1 修改 DataTypeMapper.cs3.2.2 修改 SheetDataExtensions.cs3.2.3 修改 CodeGeneratorEditor.cs3.2.4 测试 ​ 官方文档&#xff1a; Unity …

matlab时频分析库

time frequency gallery

『 Linux 』高级IO (三) - Epoll模型的封装与EpollEchoServer服务器

文章目录 前情提要Epoll 的封装Epoll封装完整代码(供参考) Epoll Echo ServerEpoll Echo Server 测试及完整代码 前情提要 在上一篇博客『 Linux 』高级IO (二) - 多路转接介绍并完成了两种多路转接方案的介绍以及对应多路转接方案代码的编写,分别为SelectServer服务器与PollSe…

PDF预览插件

PDF预览插件 可用于当前页面弹窗形式查看,可增加一些自定义功能 pdf预览插件 代码块: pdfobject.js <div class="pdfwrap"><div class="item"><h3>笑场</h3><div class="tags"><p>李诞</p><i&…

【Java项目】基于SpringBoot的【新生宿舍管理系统】

【Java项目】基于SpringBoot的【新生宿舍管理系统】 技术简介&#xff1a;本系统使用采用B/S架构、Spring Boot框架、MYSQL数据库进行开发设计。 系统简介&#xff1a;管理员登录进入新生宿舍管理系统可以查看首页、个人中心、公告信息管理、院系管理、班级管理、学生管理、宿舍…

Huginn - 构建代理、执行自动化任务

文章目录 一、关于 Huginn什么是Huginn&#xff1f;Huginn 功能加入Huginn展示 二、安装1、Docker2、本地安装3、开发 三、使用Huginn代理gems四、部署1、Heroku2、OpenShiftOpenShift 在线 3、在任何服务器上手动安装4、可选设置4.1 私人开发设置4.2 启用WeatherAgent4.3 禁用…

电子应用设计方案86:智能 AI背景墙系统设计

智能 AI 背景墙系统设计 一、引言 智能 AI 背景墙系统旨在为用户创造一个动态、个性化且具有交互性的空间装饰体验&#xff0c;通过融合先进的技术和创意设计&#xff0c;提升室内环境的美观度和功能性。 二、系统概述 1. 系统目标 - 提供多种主题和风格的背景墙显示效果&…

基于Spring Boot的IT技术交流和分享平台的设计与实现源码

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的IT技术交流和分享平台的设计与实现。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 基于S…

单元测试3.0+ @RunWith(JMockit.class)+mock+injectable+Expectations

Jmockit使用笔记_基本功能使用Tested_Injectable_Mocked_Expectations_jmockit.class-CSDN博客 静态变量直接赋值就好&#xff0c;没必要mock了 测试框架Jmockit集合junit使用 RunWith(JMockit.class) 写在测试案例类上的注解 Tested 在测试案例中,写在我们要测试的类上…

PADS Logic原理图中有很多页原理图,如何(怎样)删除其中一页或者多页

我们在进行PADS Logic进行原理图设计的时候&#xff0c;有时候可能遇到一次性设计了很多页的原理图&#xff0c;比如说十几页的原理图。那么我们在进行PADS Layout的时候&#xff0c;可能将这些原理图绘制两块板或者多块PCB板&#xff0c;那么这时候我们需要将其中的一张原理图…

Elasticsearch 创建索引 Mapping映射属性 索引库操作 增删改查

Mapping Type映射属性 mapping是对索引库中文档的约束&#xff0c;有以下类型。 text&#xff1a;用于分析和全文搜索&#xff0c;通常适用于长文本字段。keyword&#xff1a;用于精确匹配&#xff0c;不会进行分析&#xff0c;适用于标签、ID 等精确匹配场景。integer、long…

《GICv3_Software_Overview_Official_Release_B》学习笔记

1.不同版本的 GIC 架构及其主要功能如下图所示&#xff1a; 2.GICv2m&#xff08;Generic Interrupt Controller Virtualization Model&#xff09;是针对ARM架构的GIC&#xff08;通用中断控制器&#xff09;的一种扩展&#xff0c; GICv2m扩展为虚拟化环境中的中断管理提供了…

【QT】找不到qwt_plot.h

系统环境&#xff1a; linux 20.04 qt 6.7.2 cmake 3.22 原因&#xff1a; Qwt没有正式的FindQwt.cmake&#xff0c;Qwt也没有提供QwtConfig.cmake。而且cmake不支持qmake的配置特性&#xff0c;也不支持读取mkspecs (.prf)文件。也就是说cmake构建的qt项目不可用qwt。 解决步…

杰发科技——使用ATCLinkTool解除读保护

0. 原因 在jlink供电电压不稳定的情况下&#xff0c;概率性出现读保护问题&#xff0c;量产时候可以通过离线烧录工具避免。代码中开了读保护&#xff0c;但是没有通过can/uart/lin/gpio控制等方式进行关闭&#xff0c;导致无法关闭读保护。杰发所有芯片都可以用本方式解除读保…

Sublime Text4 4189 安装激活【 2025年1月3日 亲测可用】

-----------------测试时间2025年1月3日------------------- 下载地址 官方网址&#xff1a;https://www.sublimetext.com 更新日志&#xff1a;https://www.sublimetext.com/download V4189 64位&#xff1a;https://www.sublimetext.com/download_thanks?targetwin-x64 ....…

前后端规约

文章目录 引言I 【强制】前后端交互的 API请求内容响应体响应码II 【推荐】MVC响应体III【参考】IV 其他引言 服务器内部重定向必须使用 forward;外部重定向地址必须使用 URL 统一代理模块生成,否则会因线上采用 HTTPS 协议而导致浏览器提示“不安全”,并且还会带来 URL 维护…

linux安装redis及Python操作redis

目录 一、Redis安装 1、下载安装包 2、解压文件 3、迁移文件夹 4、编译 5、管理redis文件 6、修改配置文件 7、启动Redis 8、将redis服务交给systemd管理 二、Redis介绍 1、数据结构 ①字符串String ②列表List ③哈希Hash ④集合Set ⑤有序集合Sorted Set 2、…

在线RSA pem 密钥pkcs1转pkcs8格式--支持公钥和私钥

具体前往&#xff1a;在线RSA密钥pkcs1转pkcs8--在线将RSA私钥/公钥的pkcs1格式转换到pkcs8格式,支持pem格式

修复OpenLinkSaas客户端在使用AtomGit账号时页面崩溃

问题描述&#xff1a;当一个新的AtomGit用户登录OpenLinkSaas客户端后出现了页面崩溃。 从浏览器控制台来看&#xff0c;是gitNoticeList出现了null。 查看代码后发现是请求atomGit api是返回的一个null的列表 接下来我们加下保护性的代码&#xff0c;来兼容null或undefine的情…

rocketmq-pull模式-消费重平衡和拉取PullTaskImpl线程

1、观察consumer的线程模型 使用arthas分析 MQClientFactoryScheduledThread 定时任务线程 &#xff08;和push模式一致&#xff09; 定时任务线程&#xff0c;包含如下任务&#xff1a; 每2分钟更新nameServer列表 每30秒更新topic的路由信息 每30秒检查broker的存活&#x…