Flume1.9.0自定义Sink组件将数据发送至Mysql

需求

1、将Flume采集到的日志数据也同步保存到MySQL中一份,但是Flume目前不支持直接向MySQL中写数据,所以需要用到自定义Sink,自定义一个MysqlSink。

2、日志数据默认在Linux本地的/data/log/user.log日志文件中,使用Flume采集到MySQL中到user中。

3、user.log的数据格式如下:

2020-01-01 01:10:23,tom,18,beijing
2020-01-01 01:12:09,jack,20,shanghai
2020-01-01 01:13:17,jessic,15,guangzhou

4、mysql中的user表结构如下:

CREATE TABLE user  (id int(11) NOT NULL AUTO_INCREMENT,name varchar(255),age int(11),city varchar(255),create_time datetime(0),PRIMARY KEY (id)
);

实现 

鉴于此,可以使用 Exec Source + File Channel + Custom Mysql Sink 来实现。官方文档如下:

Exec Source:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#exec-sourceFile Channel:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#file-channelCustom Sink:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#custom-sink
https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink

创建工程

引入依赖

主要是 flume-ng-core 和 mysql-connector-java 依赖,其他可不引入。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flume-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.4</version></dependency>
<!--        <dependency>-->
<!--            <groupId>com.alibaba</groupId>-->
<!--            <artifactId>fastjson</artifactId>-->
<!--            <version>2.0.25</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>cn.hutool</groupId>-->
<!--            <artifactId>hutool-core</artifactId>-->
<!--            <version>5.8.27</version>-->
<!--        </dependency>--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.10</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies></project>

编写 Custom Sink

package com.example.flumedemo.sink;import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.*;
import java.util.ArrayList;
import java.util.List;/*** 自定义Sink,实现将数据写入到mysql。* <p>* 注意:* 1、编写完成打包后,需要把当前jar包和mysql驱动包放到flume下的lib目录下。* 2、linux直接连linux上的mysql,最好不要连win上的mysql了,避坑。** @author liaorj* @date 2024/11/14*/
public class MySink extends AbstractSink implements Configurable {private static final Logger logger = LoggerFactory.getLogger(MySink.class);private String mysqlUrl;private String username;private String password;private String tableName;//表字段,逗号分割。需要和Event body中的数据对应。private String tableFields;@Overridepublic void configure(Context context) {mysqlUrl = context.getString("mysqlUrl");Preconditions.checkNotNull(mysqlUrl, "mysqlUrl required");username = context.getString("username");Preconditions.checkNotNull(username, "username required");password = context.getString("password");Preconditions.checkNotNull(password, "password required");tableName = context.getString("tableName");Preconditions.checkNotNull(tableName, "tableName required");tableFields = context.getString("tableFields");Preconditions.checkNotNull(tableFields, "tableFields required");}@Overridepublic Status process() throws EventDeliveryException {Status status = null;//开启事务Channel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();Event event = null;while (true) {event = ch.take();if (event != null) {break;}}Connection conn = null;PreparedStatement stmt = null;try {//获取body中的数据String body = new String(event.getBody(), Charsets.UTF_8);//如果这两个数组大小不一样,则抛异常String[] bodySplit = body.split(",");String[] fieldsSplit = tableFields.split(",");if (bodySplit.length != fieldsSplit.length) {//字段数对不上throw new Exception("the number of tableFields is incorrect");}//根据字段数生成对应的问号List<String> questionMarkList = new ArrayList<>();for (int i = 0; i < fieldsSplit.length; i++) {questionMarkList.add("?");}String questionMarks = String.join(",", questionMarkList);//生成sql并插入数据String formatSql = String.format("insert into %s(%s) values(%s)", tableName, tableFields, questionMarks);logger.info("-----formatSql={}", formatSql);logger.info("-----mysqlUrl={}, username={}, password={}", mysqlUrl, username, password);DriverManager.registerDriver(new com.mysql.cj.jdbc.Driver());conn = DriverManager.getConnection(mysqlUrl, username, password);stmt = conn.prepareStatement(formatSql);for (int i = 0; i < bodySplit.length; i++) {stmt.setString(i + 1, bodySplit[i]);}stmt.executeUpdate();txn.commit();status = Status.READY;} catch (Throwable t) {//异常则回滚txn.rollback();status = Status.BACKOFF;if (t instanceof Error) {throw (Error) t;} else {throw new EventDeliveryException(t);}} finally {//关闭事务txn.close();//关闭PrepareStatement预处理if (stmt != null) {try {stmt.close();} catch (SQLException e) {e.printStackTrace();}}//关闭Connection连接if (conn != null) {try {conn.close();} catch (SQLException e) {e.printStackTrace();}}}return status;}
}

打包

mvn clean
mvn package
打包好后,需要把当前jar包和mysql驱动包一起上传到linux上的flume目录下的lib目录中,否则会报错驱动找不到。

配置文件

创建配置文件

然后在flume目录下的conf目录下创建配置文件:file-to-mysql.conf,内容如下,注意mysqlUrl/username/password 要修改成自己的。

# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/user.log# Describe the sink,custom sink to mysql
a1.sinks.k1.type = com.example.flumedemo.sink.MySink
a1.sinks.k1.mysqlUrl = jdbc:mysql://192.168.163.128:3306/flume_demo?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true
a1.sinks.k1.username = root
a1.sinks.k1.password = toor
a1.sinks.k1.tableName = user
a1.sinks.k1.tableFields = create_time,name,age,city# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/user/checkpointDir
a1.channels.c1.dataDirs = /data/user/dataDirs# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume

切换到flume目录,执行:

bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-mysql.conf -Dflume.root.logger=INFO,console

测试结果

查看flume控制台日志:

查看mysql user表,已插入数据:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/60733.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

T265相机双目鱼眼+imu联合标定(全记录)

最近工作用到t265&#xff0c;记录一遍标定过程 1.安装驱动 首先安装realsense驱动&#xff0c;因为笔者之前使用过d435i&#xff0c;装的librealsense版本为2.55.1&#xff0c;直接使用t265会出现找不到设备的问题&#xff0c;经查阅发现是因为realsense在2.53.1后就不再支持…

RT-DETR融合[CVPR2023]FasterNet种的PConv及相关改进思路

RT-DETR使用教程&#xff1a; RT-DETR使用教程 RT-DETR改进汇总贴&#xff1a;RT-DETR更新汇总贴 《Run, Don’t Walk: Chasing Higher FLOPS for Faster Neural Networks》 一、 模块介绍 论文链接&#xff1a;Run, Dont Walk: Chasing Higher FLOPS for Faster Neural Netwo…

【测试框架篇】单元测试框架pytest(2):用例编写

一、 前言 前面一章我们介绍了pytest环境安装和配置&#xff0c;并在pycharm里面实现了我们第一个pytest脚本。但是有些童鞋可能在编写脚本的时候遇到了问题&#xff0c;本文会讲一下我们编写pytest用例时需要遵守哪些既定的规则&#xff0c;同时这个规则也是可以修改的。 二…

嵌入式硬件电子电路设计(五)MOS管详解(NMOS、PMOS、三极管跟mos管的区别)

引言&#xff1a;在我们的日常使用中&#xff0c;MOS就是个纯粹的电子开关&#xff0c;虽然MOS管也有放大作用&#xff0c;但是几乎用不到&#xff0c;只用它的开关作用&#xff0c;一般的电机驱动&#xff0c;开关电源&#xff0c;逆变器等大功率设备&#xff0c;全部使用MOS管…

Conda安装软件错误(Pycharm)

conda的环境变量路径错误&#xff0c;比如移动了conda的文件位置conda的python版本不适合&#xff0c;python3.10现在更适合很多库conda对cmd没有初始化&#xff0c;conda init cmd.exe

《TCP/IP网络编程》学习笔记 | Chapter 11:进程间通信

《TCP/IP网络编程》学习笔记 | Chapter 11&#xff1a;进程间通信 《TCP/IP网络编程》学习笔记 | Chapter 11&#xff1a;进程间通信进程间通信的基本概念通过管道实现进程间通信通过管道进行进程间双向通信 运用进程间通信习题&#xff08;1&#xff09;什么是进程间通信&…

推荐一款高效的网站数据抓取工具:SysNucleus WebHarvy

SysNucleus WebHarvy是一款高效的网站数据抓取工具&#xff0c;支持从网页中提取文本、图像、URL 和电子邮件等内容&#xff0c;无需编写任何代码或脚本即可轻松实现数据抓取。用户可以通过 WebHarvy 内置的浏览器直观地浏览网页&#xff0c;指引软件提取所需的数据。它通过自动…

道陟科技EMB产品开发进展与标准设计的建议|2024电动汽车智能底盘大会

11月12日&#xff0c;2024电动汽车智能底盘大会在重庆开幕。会议由中国汽车工程学会主办&#xff0c;电动汽车产业技术创新战略联盟、中国汽车工程学会智能底盘分会、智能绿色车辆与交通全国重点实验室承办。本届大会围绕电动汽车智能底盘相关技术发展与融合&#xff0c;满足高…

Spring Authorization Server OAuth2.1

Spring Authorization Server介绍 Spring Authorization Server 是一个框架&#xff0c;它提供了 OAuth 2.1 和 OpenID Connect 1.0 规范以及其他相关规范的实现。 它建立在 Spring Security 之上&#xff0c;为构建 OpenID Connect 1.0 身份提供者和 OAuth2 授权服务器产品提供…

C++ 优先算法 —— 三数之和(双指针)

目录 题目&#xff1a;三数之和 1. 题目解析 2. 算法原理 ①. 暴力枚举 ②. 双指针算法 不漏的处理&#xff1a; 去重处理&#xff1a; 固定一个数 a 的优化&#xff1a; 3. 代码实现 Ⅰ. 暴力枚举&#xff08;会超时 O&#xff08;N&#xff09;&#xff09; Ⅱ.…

(三十三)队列(queue)

文章目录 1. 队列&#xff08;queue&#xff09;1.1 定义1.2 函数1.3 习题1.3.1 例题&#xff08;周末舞会&#xff09; 2. 双向队列&#xff08;deque&#xff09;2.1 定义2.2 函数2.3 题目2.3.1 例题&#xff08;打BOSS&#xff09; 1. 队列&#xff08;queue&#xff09; 队…

《FreeRTOS任务基础知识以及任务创建相关函数》

目录 1.FreeRTOS多任务系统与传统单片机单任务系统的区别 2.FreeRTOS中的任务&#xff08;Task&#xff09;介绍 2.1 任务特性 2.2 FreeRTOS中的任务状态 2.3 FreeRTOS中的任务优先级 2.4 在任务函数中退出 2.5 任务控制块和任务堆栈 2.5.1 任务控制块 2.5.2 任务堆栈…

springboot的社区团购系统设计录像

springboot的社区团购系统设计录像 springboot的社区团购系统设计

力扣每日一题

行变成回文&#xff1a; 对于每一行&#xff0c;遍历前半部分的元素&#xff0c;与后半部分的元素比较。如果不相等&#xff0c;计数器加 1&#xff0c;表示需要翻转一次。 列变成回文&#xff1a; 将矩阵转置&#xff0c;使用与行类似的方式对每一列进行统计。可以使用 Python…

linux c 语言回调函数学习

动机 最近在看 IO多路复用&#xff0c;包括 select() poll () epoll() 的原理以及libevent&#xff0c; 对里面提及的回调机制 比较头大&#xff0c;特写此文用例记录学习笔记。 什么是回调函数 网上看到的最多的一句话便是&#xff1a;回调函数 就是 函数指针的一种用法&am…

游戏引擎学习第九天

视频参考:https://www.bilibili.com/video/BV1ouUPYAErK/ 修改之前的方波数据&#xff0c;改播放正弦波 下面主要讲关于浮点数 1. char&#xff08;字符类型&#xff09; 大小&#xff1a;1 字节&#xff08;8 位&#xff09;表示方式&#xff1a;char 存储的是一个字符的 A…

# Python IDE的介绍和选择 --- 《跟着小王学Python》

Python IDE的介绍和选择 — 《跟着小王学Python》 《跟着小王学Python》 是一套精心设计的Python学习教程&#xff0c;适合各个层次的学习者。本教程从基础语法入手&#xff0c;逐步深入到高级应用&#xff0c;以实例驱动的方式&#xff0c;帮助学习者逐步掌握Python的核心概念…

柯桥生活英语口语学习“面坨了”英语怎么表达?

“面坨了”英语怎么表达&#xff1f; 要想搞清楚这个表达&#xff0c;首先&#xff0c;我们要搞明白“坨”是啥意思&#xff1f; 所谓“坨”就是指&#xff0c;面条在汤里泡太久&#xff0c;从而变涨&#xff0c;黏糊凝固在一起的状态。 有一个词汇&#xff0c;很适合用来表达这…

ZeroSSL HTTPS SSL证书ACMESSL申请3个月证书

目录 一、引言 二、准备工作 三、申请 SSL 证书 四、证书选型 五、ssl重要性 一、引言 目前免费 Lets Encrypt、ZeroSSL、BuyPass、Google Public CA SSL 证书&#xff0c;一般免费3-6个月。从申请难易程度分析&#xff0c;zerossl申请相对快速和简单&#xff0c;亲测速度非…

Java连接MySQL(测试build path功能)

Java连接MySQL&#xff08;测试build path功能&#xff09; 实验说明下载MySQL的驱动jar包连接测试的Java代码 实验说明 要测试该情况&#xff0c;需要先安装好MySQL的环境&#xff0c;其实也可以通过测试最后提示的输出来判断build path是否成功&#xff0c;因为如果不成功会直…