使用Flink实现MySQL到Kafka的数据流转换

使用Flink实现MySQL到Kafka的数据流转换

本篇博客将介绍如何使用Flink将数据从MySQL数据库实时传输到Kafka,这是一个常见的用例,适用于需要实时数据connector的场景。
在这里插入图片描述

环境准备

在开始之前,确保你的环境中已经安装了以下软件:
Apache Flink 准备相关pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>EastMoney</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency></dependencies></project>

MySQL数据库,初始化mysql表

CREATE TABLE `t_stock_code_price` (`id` bigint NOT NULL AUTO_INCREMENT,`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',`name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',`close` double DEFAULT NULL COMMENT '最新价',`change_percent` double DEFAULT NULL COMMENT '涨跌幅',`change` double DEFAULT NULL COMMENT '涨跌额',`volume` double DEFAULT NULL COMMENT '成交量(手)',`amount` double DEFAULT NULL COMMENT '成交额',`amplitude` double DEFAULT NULL COMMENT '振幅',`turnover_rate` double DEFAULT NULL COMMENT '换手率',`peration` double DEFAULT NULL COMMENT '市盈率',`volume_rate` double DEFAULT NULL COMMENT '量比',`hign` double DEFAULT NULL COMMENT '最高',`low` double DEFAULT NULL COMMENT '最低',`open` double DEFAULT NULL COMMENT '今开',`previous_close` double DEFAULT NULL COMMENT '昨收',`pb` double DEFAULT NULL COMMENT '市净率',`create_time` varchar(64) NOT NULL COMMENT '写入时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

Kafka消息队列

1. 启动zookeeperzkServer start
2. 启动kafka服务kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建topickafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
4. 消费数据kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic east_money --from-beginning

步骤解释

获取流执行环境:首先,我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境,并设置其运行模式为流处理模式。

创建流表环境:接着,我们通过StreamTableEnvironment.create创建一个流表环境,这个环境允许我们使用SQL语句来操作数据流。

val senv = StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)

定义MySQL数据源表:我们使用一个SQL语句创建了一个临时表t_stock_code_price,这个表代表了我们要从MySQL读取的数据结构和连接信息。

val source_table ="""|CREATE TEMPORARY TABLE t_stock_code_price (|  id BIGINT NOT NULL,|  code STRING NOT NULL,|  name STRING NOT NULL,|  `close` DOUBLE,|  change_percent DOUBLE,|  change DOUBLE,|  volume DOUBLE,|  amount DOUBLE,|  amplitude DOUBLE,|  turnover_rate DOUBLE,|  peration DOUBLE,|  volume_rate DOUBLE,|  hign DOUBLE,|  low DOUBLE,|  `open` DOUBLE,|  previous_close DOUBLE,|  pb DOUBLE,|  create_time STRING NOT NULL,|  PRIMARY KEY (id) NOT ENFORCED|) WITH (|   'connector' = 'jdbc',|   'url' = 'jdbc:mysql://localhost:3306/mydb',|   'driver' = 'com.mysql.cj.jdbc.Driver',|   'table-name' = 't_stock_code_price',|   'username' = 'root',|   'password' = '12345678'|)|""".stripMargintEnv.executeSql(source_table)

定义Kafka目标表:然后,我们定义了一个Kafka表re_stock_code_price_kafka,指定了Kafka的连接参数和表结构。

tEnv.executeSql("CREATE TABLE re_stock_code_price_kafka (" +"`id` BIGINT," +"`code` STRING," +"`name` STRING," +"`close` DOUBLE," +"`change_percent` DOUBLE," +"`change` DOUBLE," +"`volume` DOUBLE," +"`amount` DOUBLE," +"`amplitude` DOUBLE," +"`turnover_rate` DOUBLE," +"`operation` DOUBLE," +"`volume_rate` DOUBLE," +"`high` DOUBLE," +"`low` DOUBLE," +"`open` DOUBLE," +"`previous_close` DOUBLE," +"`pb` DOUBLE," +"`create_time` STRING," +"rise int"+") WITH (" +"'connector' = 'kafka'," +"'topic' = 'east_money'," +"'properties.bootstrap.servers' = '127.0.0.1:9092'," +"'properties.group.id' = 'mysql2kafka'," +"'scan.startup.mode' = 'earliest-offset'," +"'format' = 'csv'," +"'csv.field-delimiter' = ','" +")")

数据转换和写入:最后,我们执行了一个插入操作,将从MySQL读取的数据转换(这里通过case when语句添加了一个新字段rise)并写入到Kafka中。这个可以实现任何的sql etl 来满足我们的需求。

    tEnv.executeSql("insert into re_stock_code_price_kafka select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")

全部代码

package org.eastimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Mysql2Kafka {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)val source_table ="""|CREATE TEMPORARY TABLE t_stock_code_price (|  id BIGINT NOT NULL,|  code STRING NOT NULL,|  name STRING NOT NULL,|  `close` DOUBLE,|  change_percent DOUBLE,|  change DOUBLE,|  volume DOUBLE,|  amount DOUBLE,|  amplitude DOUBLE,|  turnover_rate DOUBLE,|  peration DOUBLE,|  volume_rate DOUBLE,|  hign DOUBLE,|  low DOUBLE,|  `open` DOUBLE,|  previous_close DOUBLE,|  pb DOUBLE,|  create_time STRING NOT NULL,|  PRIMARY KEY (id) NOT ENFORCED|) WITH (|   'connector' = 'jdbc',|   'url' = 'jdbc:mysql://localhost:3306/mydb',|   'driver' = 'com.mysql.cj.jdbc.Driver',|   'table-name' = 't_stock_code_price',|   'username' = 'root',|   'password' = '12345678'|)|""".stripMargintEnv.executeSql(source_table)val result = tEnv.executeSql("select * from t_stock_code_price")result.print()tEnv.executeSql("CREATE TABLE re_stock_code_price_kafka (" +"`id` BIGINT," +"`code` STRING," +"`name` STRING," +"`close` DOUBLE," +"`change_percent` DOUBLE," +"`change` DOUBLE," +"`volume` DOUBLE," +"`amount` DOUBLE," +"`amplitude` DOUBLE," +"`turnover_rate` DOUBLE," +"`operation` DOUBLE," +"`volume_rate` DOUBLE," +"`high` DOUBLE," +"`low` DOUBLE," +"`open` DOUBLE," +"`previous_close` DOUBLE," +"`pb` DOUBLE," +"`create_time` STRING," +"rise int"+") WITH (" +"'connector' = 'kafka'," +"'topic' = 'east_money'," +"'properties.bootstrap.servers' = '127.0.0.1:9092'," +"'properties.group.id' = 'mysql2kafka'," +"'scan.startup.mode' = 'earliest-offset'," +"'format' = 'csv'," +"'csv.field-delimiter' = ','" +")")tEnv.executeSql("insert into re_stock_code_price_kafka select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")}
}

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/782270.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ML-Decoder: Scalable and Versatile Classification Head

1、引言 论文链接&#xff1a;https://openaccess.thecvf.com/content/WACV2023/papers/Ridnik_ML-Decoder_Scalable_and_Versatile_Classification_Head_WACV_2023_paper.pdf 因为 transformer 解码器分类头[1] 在少类别多标签分类数据集上表现得很好&#xff0c;但由于其查询…

PHP的定时任务框架的taskPHP3.0学习记录2(环境要求、配置Redis、crontab执行时间语法、命令操作以及Screen全屏窗口管理器)

环境要求 php版本> 5.5开启socket扩展开启pdo扩展开启shmop扩展 echo <pre>; echo --; $requiredVersion 5.6.0; $currentVersion phpversion(); if (version_compare($currentVersion, $requiredVersion, >)) {echo "1.PHP版本满足要求&#xff0c;当前版…

c语言:vs2022写一个一元二次方程(包含虚根)

求一元二次方程 的根&#xff0c;通过键盘输入a、b、c&#xff0c;根据△的值输出对应x1和x2的值(保留一位小数)(用if语句完成)。 //一元二次方程的实现 #include <stdio.h> #include <math.h> #include <stdlib.h> int main() {double a, b, c, delta, x1…

数据结构 - 算法效率|时间复杂度|空间复杂度

目录 1.算法效率 2.时间复杂度 2.1定义 2.2大O渐近表示法 2.3常见时间复杂度计算举例 3.空间复杂度 3.1定义 3.2常见空间复杂度计算举例 1.算法效率 算法的效率常用算法复杂度来衡量&#xff0c;算法复杂度描述了算法在输入数据规模变化时&#xff0c;其运行时间和空间…

opejdk11 java 启动流程 java main方法怎么被jvm执行

java启动过程 java main方法怎么被jvm执行 java main方法是怎么被jvm调用的 1、jvm main入口 2、执行JLI_Launch方法 3、执行JVMInit方法 4、执行ContinueInNewThread方法 5、执行CallJavaMainInNewThread方法 6、创建线程执行ThreadJavaMain方法 7、执行ThreadJavaMain方法…

Last-Modified:HTTP缓存控制机制解析

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

docker配置github仓库ghcr国内镜像加速

文章目录 说明ghcr.io简介配置镜像命令地址命令行方式1panel面板方式方式一&#xff1a;配置镜像加速&#xff0c;命令行拉取方式二&#xff1a;配置镜像仓库&#xff0c;可视化拉取 说明 由于使用的容器需要从github下载镜像&#xff0c;服务器在国外下载速度很慢&#xff0c…

26. UE5 RPG同步面板属性(二)

在上一篇&#xff0c;我们解析了UI属性面板的实现步骤&#xff1a; 首先我们需要通过c去实现创建GameplayTag&#xff0c;这样可以在c和UE里同时获取到Tag创建一个DataAsset类&#xff0c;用于设置tag对应的属性和显示内容创建AttributeMenuWidgetController实现对应逻辑 并且…

理解游戏服务器架构-部署架构

目录 前言 我所理解的服务器架构 什么是否部署架构 部署架构的职责 进程业务职责 网络链接及通讯方式 与客户端的连接方式 服务器之间连接关系 数据落地以及一致性 数据库的选择 数据访问三级缓存 数据分片 读写分离 分布式数据处理 负载均衡 热更新 配置更新 …

html第二次作业

骨架 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-width, initi…

vscode初始化node项目

首先需要安装node环境&#xff0c;推荐直接使用nvm 安装node&#xff0c;方便切换node版本 1.npm init 初始化node项目 在命令行输入npm init指令 根据指令创建完成后会在当前目录下生成一个package.json文件&#xff0c;记住运行npm init执行的目录必须是一个空目录 2.创建…

金三银四面试题(八):JVM常见面试题(2)

今天我们继续探讨常见的JVM面试题。这些问题不比之前的问题庞大&#xff0c;多用于面试中​JVM部分的热身运动&#xff0c;开胃菜&#xff0c;但是大家已经要认真准备。 JRE、JDK、JVM 及JIT 之间有什么不同&#xff1f; JRE 代表Java 运行时&#xff08;Java run-time&#…

Kafka入门到实战-第四弹

Kafka入门到实战 Kafka集群搭建官网地址Kafka概述使用Kraft搭建Kafka集群更新计划 Kafka集群搭建 官网地址 声明: 由于操作系统, 版本更新等原因, 文章所列内容不一定100%复现, 还要以官方信息为准 https://kafka.apache.org/Kafka概述 Apache Kafka 是一个开源的分布式事件…

计算机视觉之三维重建(5)---双目立体视觉

文章目录 一、平行视图1.1 示意图1.2 平行视图的基础矩阵1.3 平行视图的极几何1.4 平行视图的三角测量 二、图像校正三、对应点问题3.1 相关匹配法3.2 归一化相关匹配法3.3 窗口问题3.4 相关法存在的问题3.5 约束问题 一、平行视图 1.1 示意图 如下图即是一个平行视图。特点&a…

2核2G服务器优惠价格轻量61元一年,CVM价格313元15个月

腾讯云2核2G服务器多少钱一年&#xff1f;轻量服务器61元一年&#xff0c;CVM 2核2G S5服务器313.2元15个月&#xff0c;轻量2核2G3M带宽、40系统盘&#xff0c;云服务器CVM S5实例是2核2G、50G系统盘。腾讯云2核2G服务器优惠活动 txybk.com/go/txy 链接打开如下图&#xff1a;…

最小覆盖子串-java

最小覆盖子串-java 题目描述 : 给你一个字符串 s 、一个字符串 t 。返回 s 中涵盖 t 所有字符的最小子串。如果 s 中不存在涵盖 t 所有字符的子串&#xff0c;则返回空字符串 "" 。 注意&#xff1a; 对于 t 中重复字符&#xff0c;我们寻找的子字符串中该字符数量必…

Mac上Matlab_R2023b ARM 版 启动闪退(意外退出)解决方法

安装好后&#xff0c;使用 "libmwlmgrimpl.dylib" 文件替换掉"/Applications/Matlab_R2023b.app/bin/maca64/matlab_startup_plugins/lmgrimpl"文件夹下的同名文件 在终端下执行如下命令&#xff1a; codesign --verbose --force --deep -s - /Applicat…

本地项目上传到GitHub

本文档因使用实际项目提交做为案例&#xff0c;故使用xxx等字符进行脱敏&#xff0c;同时隐藏了部分输出&#xff0c;已实际项目和命令行输出为准 0、 Git 安装与GitHub注册 1&#xff09; 在下述地址下载Git&#xff0c;安装一路默认下一步即可。安装完成后&#xff0c;随便…

乡村数字化转型:科技赋能打造智慧农村新生态

随着信息技术的迅猛发展&#xff0c;数字化转型已成为推动社会进步的重要引擎。在乡村振兴的大背景下&#xff0c;乡村数字化转型不仅是提升乡村治理能力和治理水平现代化的关键&#xff0c;更是推动农业现代化、农村繁荣和农民增收的重要途径。本文旨在探讨乡村数字化转型的内…

解决MySQL幻读?可重复读隔离级别背后的工作原理

什么是当前读和快照读 当前读&#xff1a;又称为 "锁定读"&#xff0c;它会读取记录的最新版本&#xff08;也就是最新的提交结果&#xff09;&#xff0c;并对读取到的数据加锁&#xff0c;其它事务不能修改这些数据&#xff0c;直到当前事务提交或回滚。"sele…