Flume 之自定义Sink

1、简介

        前文我们介绍了 Flume 如何自定义 Source, 并进行案例演示,本文将接着前文,自定义Sink,在这篇文章中,将使用自定义 Source 和 自定义的 Sink 实现数据传输,让大家快速掌握Flume这门技术。

2、自定义Source

        自定义Source参考前文:https://blog.csdn.net/zwl2220943286/article/details/135633120

3、自定义Sink

        本文将Sink定义为mysql。

3.1、引入依赖
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.11.0</version>
</dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>
3.2、自定义Sink
3.2.1、Sink代码
import com.weilong.flumeselfdefinition.util.MysqlConfig;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {private final static Logger log = LoggerFactory.getLogger(MySink.class);private String url;private String username;private String password;@Overridepublic Status process() throws EventDeliveryException {Status status = null;Channel channel = getChannel();// channel 支持事务Transaction thx = channel.getTransaction();thx.begin();try {Event event = channel.take();String name = new String(event.getBody());int i = MysqlConfig.insertData(this.url, this.username, this.password, name);if (i > 0){log.info("==插入数据库成功==");}thx.commit();status = Status.READY;} catch (Exception ex){ex.printStackTrace();}return status;}@Overridepublic void configure(Context context) {String url = context.getString("url");String username = context.getString("username");String password = context.getString("password");this.url = url;this.username = username;this.password = password;}
}
 3.2.2、数据库连接配置:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;public class MysqlConfig {private MysqlConfig(){}static {try {Class.forName("com.mysql.cj.jdbc.Driver");}catch (Exception ex){ex.printStackTrace();}}public static Connection getConnection(String url, String username, String password) throws SQLException {Connection connection = DriverManager.getConnection(url, username, password);return connection;}public static int insertData(String url, String username,String password, String name){Connection connection = null;try{connection = getConnection(url, username, password);PreparedStatement preparedStatement = connection.prepareStatement("insert into test(`name`) values( '" + name + "')");boolean res = preparedStatement.execute();if (res){return 1;}return 0;}catch (Exception ex){ex.printStackTrace();}finally {if (connection != null){try {connection.close();}catch (Exception ex){ex.printStackTrace();}}}return 0;}
}
 3.3、Flume 配置文件

        vim flume-self-source-sink.conf

a1.sources = r1
a1.channels = c1
a1.sinks=k1
# source
a1.sources.r1.type = com.weilong.flumeselfdefinition.MySource 
# 自定义 Source 的全限定类名
a1.sources.r1.path = http://192.168.30.3:8088/hello 
# 自定义参数
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 自定义Sink
a1.sinks.k1.type = com.weilong.flumeselfdefinition.MySink
a1.sinks.k1.url = jdbc:mysql://192.168.30.3:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC
a1.sinks.k1.username = root
a1.sinks.k1.password = 146815
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、将jar包放入lib目录 
 4.1、将自定义jar包放入lib目录

 

4.2、将数据库驱动jar包放入lib目录

        驱动jar包下载地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java

注:mysql 驱动jar包不放进lib,会出现驱动类找不到。 

5、启动 Flume
bin/flume-ng agent -c conf/ -n a1 -f testconf/flume-self-source-sink.conf -Dflume.root.logger=INFO,console

注:启动Flume 之前,自定义 web 服务也要启动。

 6、结果

成功保存进数据库。

7、总结 

         本文结合前文完成 Flume 的 Source 和 Sink 的自定义,帮助大家能够完成各种场景下的Flume的使用。关于更高级Flume的知识,关注下面公众号。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/629005.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python - 深夜数据结构与算法之 Sort

目录 一.引言 二.排序简介 1.排序类型 2.时间复杂度 3.初级排序 4.高级排序 A.快速排序 B.归并排序 C.堆排序 5.特殊排序 三.经典算法实战 1.Quick-Sort 2.Merge-Sort 3.Heap-Sort 4.Relative-Sort-Array [1122] 5.Valid-anagram [242] 6.Merge-Intervals […

Java NIO (二)NIO Buffer类的重要方法(备份)

1 allocate()方法 在使用Buffer实例前&#xff0c;我们需要先获取Buffer子类的实例对象&#xff0c;并且分配内存空间。需要获取一个Buffer实例对象时&#xff0c;并不是使用子类的构造器来创建&#xff0c;而是调用子类的allocate()方法。 public class AllocateTest {static…

如何快速看懂一篇英文AI论文?

已经2024年了&#xff0c;该出现一个写论文解读AI Agent了。 大家肯定也在经常刷论文吧。 但真正尝试过用GPT去刷论文、写论文解读的小伙伴&#xff0c;一定深有体验——费劲。其他agents也没有能搞定的&#xff0c;今天我发现了一个超级厉害的写论文解读的agent &#xff0c…

某银行主机安全运营体系建设实践

随着商业银行业务的发展&#xff0c;主机规模持续增长&#xff0c;给安全团队运营工作带来极大挑战&#xff0c;传统的运营手段已经无法适应业务规模的快速发展&#xff0c;主要体现在主机资产数量多、类型复杂&#xff0c;安全团队难以对全量资产进行及时有效的梳理、管理&…

HCIA—— 16每日一讲:HTTP和HTTPS、无状态和cookie、持久连接和管线化、(初稿丢了,这是新稿,请宽恕我)

学习目标&#xff1a; HTTP和HTTPS、无状态和cookie、持久连接和管线化、HTTP的报文、URI和URL&#xff08;初稿丢了&#xff0c;这是新稿&#xff0c;请宽恕我&#x1f636;‍&#x1f32b;️&#xff09; 学习内容&#xff1a; HTTP无状态和cookieHTTPS持久连接和管线化 目…

vue2 pdfjs-2.8.335-dist pdf文件在线预览功能

1、首先先将 pdfjs-2.8.335-dist 文件夹从网上搜索下载&#xff0c;复制到public文件夹下. 2、在components下新建组件PdfViewer.vue文件 3、在el-upload 中调用 pdf-viewer 组件 4、在el-upload 中的 on-preview方法中加上对应的src路径 internalPreview(file) { //判断需要…

编译原理1.3习题 程序设计语言的发展历程

图源&#xff1a;文心一言 编译原理习题整理~&#x1f95d;&#x1f95d; 作为初学者的我&#xff0c;这些习题主要用于自我巩固。由于是自学&#xff0c;答案难免有误&#xff0c;非常欢迎各位小伙伴指正与讨论&#xff01;&#x1f44f;&#x1f4a1; 第1版&#xff1a;自…

IPv6隧道--GRE隧道

GRE隧道 通用路由封装协议GRE(Generic Routing Encapsulation)可以对某些网络层协议(如IPX、ATM、IPv6、AppleTalk等)的数据报文进行封装,使这些被封装的数据报文能够在另一个网络层协议(如IPv4)中传输。 GRE提供了将一种协议的报文封装在另一种协议报文中的机制,是一…

个人网站制作 Part 7 添加用户认证和数据库集成 | Web开发项目

文章目录 &#x1f469;‍&#x1f4bb; 基础Web开发练手项目系列&#xff1a;个人网站制作&#x1f680; 用户认证与数据库集成&#x1f528;添加用户认证&#x1f527;步骤 1: 使用Passport.js &#x1f528;集成数据库&#x1f527;步骤 2: 使用MongoDB和Mongoose &#x1f…

Grafana(二)Grafana 两种数据源图表展示(json-api与数据库)

一. 背景介绍 在先前的博客文章中&#xff0c;我们搭建了Grafana &#xff0c;它是一个开源的度量分析和可视化工具&#xff0c;可以通过将采集的数据分析、查询&#xff0c;然后进行可视化的展示&#xff0c;接下来我们重点介绍如何使用它来进行数据渲染图表展示 Docker安装G…

AIOps探索 | 基于大模型构建高效的运维知识及智能问答平台(2)

前面分享了平台对运维效率提升的重要性和挑战以及基于大模型的平台建设解决方案&#xff0c;新来的朋友点这里&#xff0c;一键回看精彩原文。 基于大模型构建高效的运维知识及智能问答平台&#xff08;1&#xff09;https://mp.csdn.net/mp_blog/creation/editor/135223109 …

【REMB 】翻译:草案remb-03

REMB REMB消息 以及 绝对时间戳选项 在带宽估计中的使用 :an absolute-value timestamp option for use in bandwidth estimatoin. 接收方带宽估计的RTCP消息 REMB 这位大神翻译的更好。 RTCP message for Receiver Estimated Maximum Bitrate draft-alvestrand-rmcat-remb-03…

iOS开发进阶(六):Xcode14 使用信号量造成线程优先级反转问题修复

文章目录 一、前言二、关于线程优先级反转三、优先级反转会造成什么后果四、怎么避免线程优先级反转五、使用信号量可能会造成线程优先级反转&#xff0c;且无法避免六、延伸阅读&#xff1a;iOS | Xcode中快速打开终端6.1 .sh绑定6.2 执行 pod install 脚本 七、延伸阅读&…

Android Activity的启动流程(Android-10)

前言 在Android开发中&#xff0c;我们经常会用到startActivity(Intent)方法&#xff0c;但是你知道startActivity(Intent)后Activity的启动流程吗&#xff1f;今天就专门讲一下最基础的startActivity(Intent)看一下Activity的启动流程&#xff0c;同时由于Launcher的启动后续…

STM32——DMA知识点及实战总结

1.DMA概念介绍 DMA&#xff0c;全称Direct Memory Access&#xff0c;即直接存储器访问。 DMA传输 将数据从一个地址空间复制到另一个地址空间。 注意&#xff1a;DMA传输无需CPU直接控制传输 2.DMA框图 3.DMA处理过程 外设的 8 个请求独立连接到每个通道&#xff0c;由 DMA_…

YOLOv5改进 | 融合改进篇 | 轻量化CCFM + SENetv2进行融合改进涨点 (全网独家首发)

一、本文介绍 本文给大家带来的改进机制是轻量化的Neck结构CCFM配合SENetv2改进的网络结构进行融合改进,其中CCFM为我本人根据RT-DETR模型一比一总结出来的,文中配其手撕结构图,其中SENetV2为网络结构重构化模块,通过其改进主干从而提取更有效的特征,这两个模块搭配在一起…

Java实现海南旅游景点推荐系统 JAVA+Vue+SpringBoot+MySQL

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 用户端2.2 管理员端 三、系统展示四、核心代码4.1 随机景点推荐4.2 景点评价4.3 协同推荐算法4.4 网站登录4.5 查询景点美食 五、免责说明 一、摘要 1.1 项目介绍 基于VueSpringBootMySQL的海南旅游推荐系统&#xff…

探索单元测试和 E2E 测试:提升软件质量的关键步骤(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

探索Redis特殊数据结构:Bitmaps(位图)在实际中的应用

一、概述 Redis官方提供了多种数据类型&#xff0c;除了常见的String、Hash、List、Set、zSet之外&#xff0c;还包括Stream、Geospatial、Bitmaps、Bitfields、Probabilistic&#xff08;HyperLogLog、Bloom filter、Cuckoo filter、t-digest、Top-K、Count-min sketch、Confi…

【机组】算术逻辑运算单元实验的解密与实战

​&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《机组 | 模块单元实验》⏰诗赋清音&#xff1a;云生高巅梦远游&#xff0c; 星光点缀碧海愁。 山川深邃情难晤&#xff0c; 剑气凌云志自修。 ​ 目录 &#x1f33a; 一、 实验目的…