需求
1、将Flume采集到的日志数据也同步保存到MySQL中一份,但是Flume目前不支持直接向MySQL中写数据,所以需要用到自定义Sink,自定义一个MysqlSink。
2、日志数据默认在Linux本地的/data/log/user.log日志文件中,使用Flume采集到MySQL中到user中。
3、user.log的数据格式如下:
2020-01-01 01:10:23,tom,18,beijing
2020-01-01 01:12:09,jack,20,shanghai
2020-01-01 01:13:17,jessic,15,guangzhou
4、mysql中的user表结构如下:
CREATE TABLE user (id int(11) NOT NULL AUTO_INCREMENT,name varchar(255),age int(11),city varchar(255),create_time datetime(0),PRIMARY KEY (id)
);
实现
鉴于此,可以使用 Exec Source + File Channel + Custom Mysql Sink 来实现。官方文档如下:
Exec Source:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#exec-sourceFile Channel:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#file-channelCustom Sink:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#custom-sink
https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink
创建工程
引入依赖
主要是 flume-ng-core 和 mysql-connector-java 依赖,其他可不引入。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flume-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.4</version></dependency>
<!-- <dependency>-->
<!-- <groupId>com.alibaba</groupId>-->
<!-- <artifactId>fastjson</artifactId>-->
<!-- <version>2.0.25</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>cn.hutool</groupId>-->
<!-- <artifactId>hutool-core</artifactId>-->
<!-- <version>5.8.27</version>-->
<!-- </dependency>--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.10</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies></project>
编写 Custom Sink
package com.example.flumedemo.sink;import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.*;
import java.util.ArrayList;
import java.util.List;/*** 自定义Sink,实现将数据写入到mysql。* <p>* 注意:* 1、编写完成打包后,需要把当前jar包和mysql驱动包放到flume下的lib目录下。* 2、linux直接连linux上的mysql,最好不要连win上的mysql了,避坑。** @author liaorj* @date 2024/11/14*/
public class MySink extends AbstractSink implements Configurable {private static final Logger logger = LoggerFactory.getLogger(MySink.class);private String mysqlUrl;private String username;private String password;private String tableName;//表字段,逗号分割。需要和Event body中的数据对应。private String tableFields;@Overridepublic void configure(Context context) {mysqlUrl = context.getString("mysqlUrl");Preconditions.checkNotNull(mysqlUrl, "mysqlUrl required");username = context.getString("username");Preconditions.checkNotNull(username, "username required");password = context.getString("password");Preconditions.checkNotNull(password, "password required");tableName = context.getString("tableName");Preconditions.checkNotNull(tableName, "tableName required");tableFields = context.getString("tableFields");Preconditions.checkNotNull(tableFields, "tableFields required");}@Overridepublic Status process() throws EventDeliveryException {Status status = null;//开启事务Channel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();Event event = null;while (true) {event = ch.take();if (event != null) {break;}}Connection conn = null;PreparedStatement stmt = null;try {//获取body中的数据String body = new String(event.getBody(), Charsets.UTF_8);//如果这两个数组大小不一样,则抛异常String[] bodySplit = body.split(",");String[] fieldsSplit = tableFields.split(",");if (bodySplit.length != fieldsSplit.length) {//字段数对不上throw new Exception("the number of tableFields is incorrect");}//根据字段数生成对应的问号List<String> questionMarkList = new ArrayList<>();for (int i = 0; i < fieldsSplit.length; i++) {questionMarkList.add("?");}String questionMarks = String.join(",", questionMarkList);//生成sql并插入数据String formatSql = String.format("insert into %s(%s) values(%s)", tableName, tableFields, questionMarks);logger.info("-----formatSql={}", formatSql);logger.info("-----mysqlUrl={}, username={}, password={}", mysqlUrl, username, password);DriverManager.registerDriver(new com.mysql.cj.jdbc.Driver());conn = DriverManager.getConnection(mysqlUrl, username, password);stmt = conn.prepareStatement(formatSql);for (int i = 0; i < bodySplit.length; i++) {stmt.setString(i + 1, bodySplit[i]);}stmt.executeUpdate();txn.commit();status = Status.READY;} catch (Throwable t) {//异常则回滚txn.rollback();status = Status.BACKOFF;if (t instanceof Error) {throw (Error) t;} else {throw new EventDeliveryException(t);}} finally {//关闭事务txn.close();//关闭PrepareStatement预处理if (stmt != null) {try {stmt.close();} catch (SQLException e) {e.printStackTrace();}}//关闭Connection连接if (conn != null) {try {conn.close();} catch (SQLException e) {e.printStackTrace();}}}return status;}
}
打包
mvn clean
mvn package
打包好后,需要把当前jar包和mysql驱动包一起上传到linux上的flume目录下的lib目录中,否则会报错驱动找不到。
配置文件
创建配置文件
然后在flume目录下的conf目录下创建配置文件:file-to-mysql.conf,内容如下,注意mysqlUrl/username/password 要修改成自己的。
# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/user.log# Describe the sink,custom sink to mysql
a1.sinks.k1.type = com.example.flumedemo.sink.MySink
a1.sinks.k1.mysqlUrl = jdbc:mysql://192.168.163.128:3306/flume_demo?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true
a1.sinks.k1.username = root
a1.sinks.k1.password = toor
a1.sinks.k1.tableName = user
a1.sinks.k1.tableFields = create_time,name,age,city# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/user/checkpointDir
a1.channels.c1.dataDirs = /data/user/dataDirs# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume
切换到flume目录,执行:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-mysql.conf -Dflume.root.logger=INFO,console
测试结果
查看flume控制台日志:
查看mysql user表,已插入数据: