使用Flink实现MySQL到Kafka的数据流转换
本篇博客将介绍如何使用Flink将数据从MySQL数据库实时传输到Kafka,这是一个常见的用例,适用于需要实时数据connector的场景。
环境准备
在开始之前,确保你的环境中已经安装了以下软件:
Apache Flink 准备相关pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>EastMoney</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency></dependencies></project>
MySQL数据库,初始化mysql表
CREATE TABLE `t_stock_code_price` (`id` bigint NOT NULL AUTO_INCREMENT,`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',`name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',`close` double DEFAULT NULL COMMENT '最新价',`change_percent` double DEFAULT NULL COMMENT '涨跌幅',`change` double DEFAULT NULL COMMENT '涨跌额',`volume` double DEFAULT NULL COMMENT '成交量(手)',`amount` double DEFAULT NULL COMMENT '成交额',`amplitude` double DEFAULT NULL COMMENT '振幅',`turnover_rate` double DEFAULT NULL COMMENT '换手率',`peration` double DEFAULT NULL COMMENT '市盈率',`volume_rate` double DEFAULT NULL COMMENT '量比',`hign` double DEFAULT NULL COMMENT '最高',`low` double DEFAULT NULL COMMENT '最低',`open` double DEFAULT NULL COMMENT '今开',`previous_close` double DEFAULT NULL COMMENT '昨收',`pb` double DEFAULT NULL COMMENT '市净率',`create_time` varchar(64) NOT NULL COMMENT '写入时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
Kafka消息队列
1. 启动zookeeperzkServer start
2. 启动kafka服务kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建topickafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
4. 消费数据kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic east_money --from-beginning
步骤解释
获取流执行环境:首先,我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境,并设置其运行模式为流处理模式。
创建流表环境:接着,我们通过StreamTableEnvironment.create创建一个流表环境,这个环境允许我们使用SQL语句来操作数据流。
val senv = StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)
定义MySQL数据源表:我们使用一个SQL语句创建了一个临时表t_stock_code_price,这个表代表了我们要从MySQL读取的数据结构和连接信息。
val source_table ="""|CREATE TEMPORARY TABLE t_stock_code_price (| id BIGINT NOT NULL,| code STRING NOT NULL,| name STRING NOT NULL,| `close` DOUBLE,| change_percent DOUBLE,| change DOUBLE,| volume DOUBLE,| amount DOUBLE,| amplitude DOUBLE,| turnover_rate DOUBLE,| peration DOUBLE,| volume_rate DOUBLE,| hign DOUBLE,| low DOUBLE,| `open` DOUBLE,| previous_close DOUBLE,| pb DOUBLE,| create_time STRING NOT NULL,| PRIMARY KEY (id) NOT ENFORCED|) WITH (| 'connector' = 'jdbc',| 'url' = 'jdbc:mysql://localhost:3306/mydb',| 'driver' = 'com.mysql.cj.jdbc.Driver',| 'table-name' = 't_stock_code_price',| 'username' = 'root',| 'password' = '12345678'|)|""".stripMargintEnv.executeSql(source_table)
定义Kafka目标表:然后,我们定义了一个Kafka表re_stock_code_price_kafka,指定了Kafka的连接参数和表结构。
tEnv.executeSql("CREATE TABLE re_stock_code_price_kafka (" +"`id` BIGINT," +"`code` STRING," +"`name` STRING," +"`close` DOUBLE," +"`change_percent` DOUBLE," +"`change` DOUBLE," +"`volume` DOUBLE," +"`amount` DOUBLE," +"`amplitude` DOUBLE," +"`turnover_rate` DOUBLE," +"`operation` DOUBLE," +"`volume_rate` DOUBLE," +"`high` DOUBLE," +"`low` DOUBLE," +"`open` DOUBLE," +"`previous_close` DOUBLE," +"`pb` DOUBLE," +"`create_time` STRING," +"rise int"+") WITH (" +"'connector' = 'kafka'," +"'topic' = 'east_money'," +"'properties.bootstrap.servers' = '127.0.0.1:9092'," +"'properties.group.id' = 'mysql2kafka'," +"'scan.startup.mode' = 'earliest-offset'," +"'format' = 'csv'," +"'csv.field-delimiter' = ','" +")")
数据转换和写入:最后,我们执行了一个插入操作,将从MySQL读取的数据转换(这里通过case when语句添加了一个新字段rise)并写入到Kafka中。这个可以实现任何的sql etl 来满足我们的需求。
tEnv.executeSql("insert into re_stock_code_price_kafka select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")
全部代码
package org.eastimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Mysql2Kafka {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)val source_table ="""|CREATE TEMPORARY TABLE t_stock_code_price (| id BIGINT NOT NULL,| code STRING NOT NULL,| name STRING NOT NULL,| `close` DOUBLE,| change_percent DOUBLE,| change DOUBLE,| volume DOUBLE,| amount DOUBLE,| amplitude DOUBLE,| turnover_rate DOUBLE,| peration DOUBLE,| volume_rate DOUBLE,| hign DOUBLE,| low DOUBLE,| `open` DOUBLE,| previous_close DOUBLE,| pb DOUBLE,| create_time STRING NOT NULL,| PRIMARY KEY (id) NOT ENFORCED|) WITH (| 'connector' = 'jdbc',| 'url' = 'jdbc:mysql://localhost:3306/mydb',| 'driver' = 'com.mysql.cj.jdbc.Driver',| 'table-name' = 't_stock_code_price',| 'username' = 'root',| 'password' = '12345678'|)|""".stripMargintEnv.executeSql(source_table)val result = tEnv.executeSql("select * from t_stock_code_price")result.print()tEnv.executeSql("CREATE TABLE re_stock_code_price_kafka (" +"`id` BIGINT," +"`code` STRING," +"`name` STRING," +"`close` DOUBLE," +"`change_percent` DOUBLE," +"`change` DOUBLE," +"`volume` DOUBLE," +"`amount` DOUBLE," +"`amplitude` DOUBLE," +"`turnover_rate` DOUBLE," +"`operation` DOUBLE," +"`volume_rate` DOUBLE," +"`high` DOUBLE," +"`low` DOUBLE," +"`open` DOUBLE," +"`previous_close` DOUBLE," +"`pb` DOUBLE," +"`create_time` STRING," +"rise int"+") WITH (" +"'connector' = 'kafka'," +"'topic' = 'east_money'," +"'properties.bootstrap.servers' = '127.0.0.1:9092'," +"'properties.group.id' = 'mysql2kafka'," +"'scan.startup.mode' = 'earliest-offset'," +"'format' = 'csv'," +"'csv.field-delimiter' = ','" +")")tEnv.executeSql("insert into re_stock_code_price_kafka select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")}
}
如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。