使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南

使用Flink实现Kafka到MySQL的数据流转换

在现代数据处理架构中,Kafka和MySQL是两种非常流行的技术。Kafka作为一个高吞吐量的分布式消息系统,常用于构建实时数据流管道。而MySQL则是广泛使用的关系型数据库,适用于存储和查询数据。在某些场景下,我们需要将Kafka中的数据实时地写入到MySQL数据库中,本文将介绍如何使用Apache Flink来实现这一过程。

在这里插入图片描述

环境准备

在开始之前,请确保你的开发环境中已经安装并配置了以下组件:
Apache Flink 准备相关pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>EastMoney</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency></dependencies></project>

Kafka消息队列

1. 启动zookeeperzkServer start
2. 启动kafka服务kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建topickafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
6. 生产数据kafka-console-producer --broker-list localhost:9092 --topic east_money

MySQL数据库
初始化mysql表

CREATE TABLE `t_stock_code_price` (`id` bigint NOT NULL AUTO_INCREMENT,`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',`name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',`close` double DEFAULT NULL COMMENT '最新价',`change_percent` double DEFAULT NULL COMMENT '涨跌幅',`change` double DEFAULT NULL COMMENT '涨跌额',`volume` double DEFAULT NULL COMMENT '成交量(手)',`amount` double DEFAULT NULL COMMENT '成交额',`amplitude` double DEFAULT NULL COMMENT '振幅',`turnover_rate` double DEFAULT NULL COMMENT '换手率',`peration` double DEFAULT NULL COMMENT '市盈率',`volume_rate` double DEFAULT NULL COMMENT '量比',`hign` double DEFAULT NULL COMMENT '最高',`low` double DEFAULT NULL COMMENT '最低',`open` double DEFAULT NULL COMMENT '今开',`previous_close` double DEFAULT NULL COMMENT '昨收',`pb` double DEFAULT NULL COMMENT '市净率',`create_time` varchar(64) NOT NULL COMMENT '写入时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

步骤解释

获取流执行环境:首先,我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境,并设置其运行模式为流处理模式。

创建流表环境:接着,我们通过StreamTableEnvironment.create创建一个流表环境,这个环境允许我们使用SQL语句来操作数据流。

val senv = StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)

定义Kafka数据源表:我们使用一个SQL语句创建了一个Kafka表re_stock_code_price_kafka,这个表代表了我们要从Kafka读取的数据结构和连接信息。

tEnv.executeSql("CREATE TABLE re_stock_code_price_kafka (" +"`id` BIGINT," +"`code` STRING," +"`name` STRING," +"`close` DOUBLE NULL," +"`change_percent` DOUBLE," +"`change` DOUBLE," +"`volume` DOUBLE," +"`amount` DOUBLE," +"`amplitude` DOUBLE," +"`turnover_rate` DOUBLE," +"`operation` DOUBLE," +"`volume_rate` DOUBLE," +"`high` DOUBLE ," +"`low` DOUBLE," +"`open` DOUBLE," +"`previous_close` DOUBLE," +"`pb` DOUBLE," +"`create_time` STRING," +"rise int"+") WITH (" +"'connector' = 'kafka'," +"'topic' = 'east_money'," +"'properties.bootstrap.servers' = '127.0.0.1:9092'," +"'properties.group.id' = 'mysql2kafka'," +"'scan.startup.mode' = 'earliest-offset'," +"'format' = 'csv'," +"'csv.field-delimiter' = ','" +")")val result = tEnv.executeSql("select * from re_stock_code_price_kafka")

定义MySQL目标表:然后,我们定义了一个MySQL表re_stock_code_price,指定了与MySQL的连接参数和表结构。

val sink_table: String ="""|CREATE TEMPORARY TABLE re_stock_code_price (|  id BIGINT NOT NULL,|  code STRING NOT NULL,|  name STRING NOT NULL,|  `close` DOUBLE,|  change_percent DOUBLE,|  change DOUBLE,|  volume DOUBLE,|  amount DOUBLE,|  amplitude DOUBLE,|  turnover_rate DOUBLE,|  peration DOUBLE,|  volume_rate DOUBLE,|  hign DOUBLE,|  low DOUBLE,|  `open` DOUBLE,|  previous_close DOUBLE,|  pb DOUBLE,|  create_time STRING NOT NULL,|  rise int,|  PRIMARY KEY (id) NOT ENFORCED|) WITH (|   'connector' = 'jdbc',|   'url' = 'jdbc:mysql://localhost:3306/mydb',|   'driver' = 'com.mysql.cj.jdbc.Driver',|   'table-name' = 're_stock_code_price',|   'username' = 'root',|   'password' = '12345678'|)|""".stripMargintEnv.executeSql(sink_table)

数据转换和写入:最后,我们执行了一个插入操作,将从Kafka读取的数据转换并写入到MySQL中。

tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")result.print()

全部代码

package org.eastimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Kafka2Mysql {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)tEnv.executeSql("CREATE TABLE re_stock_code_price_kafka (" +"`id` BIGINT," +"`code` STRING," +"`name` STRING," +"`close` DOUBLE NULL," +"`change_percent` DOUBLE," +"`change` DOUBLE," +"`volume` DOUBLE," +"`amount` DOUBLE," +"`amplitude` DOUBLE," +"`turnover_rate` DOUBLE," +"`operation` DOUBLE," +"`volume_rate` DOUBLE," +"`high` DOUBLE ," +"`low` DOUBLE," +"`open` DOUBLE," +"`previous_close` DOUBLE," +"`pb` DOUBLE," +"`create_time` STRING," +"rise int"+") WITH (" +"'connector' = 'kafka'," +"'topic' = 'east_money'," +"'properties.bootstrap.servers' = '127.0.0.1:9092'," +"'properties.group.id' = 'mysql2kafka'," +"'scan.startup.mode' = 'earliest-offset'," +"'format' = 'csv'," +"'csv.field-delimiter' = ','" +")")val result = tEnv.executeSql("select * from re_stock_code_price_kafka")val sink_table: String ="""|CREATE TEMPORARY TABLE re_stock_code_price (|  id BIGINT NOT NULL,|  code STRING NOT NULL,|  name STRING NOT NULL,|  `close` DOUBLE,|  change_percent DOUBLE,|  change DOUBLE,|  volume DOUBLE,|  amount DOUBLE,|  amplitude DOUBLE,|  turnover_rate DOUBLE,|  peration DOUBLE,|  volume_rate DOUBLE,|  hign DOUBLE,|  low DOUBLE,|  `open` DOUBLE,|  previous_close DOUBLE,|  pb DOUBLE,|  create_time STRING NOT NULL,|  rise int,|  PRIMARY KEY (id) NOT ENFORCED|) WITH (|   'connector' = 'jdbc',|   'url' = 'jdbc:mysql://localhost:3306/mydb',|   'driver' = 'com.mysql.cj.jdbc.Driver',|   'table-name' = 're_stock_code_price',|   'username' = 'root',|   'password' = '12345678'|)|""".stripMargintEnv.executeSql(sink_table)tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")result.print()print("数据打印完成!!!")}
}

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/781935.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小米SU7 我劝你再等等

文 | AUTO芯球 作者 | 李逵 我必须承认我一时没忍住 犯错了 我不会被我老婆打吧 感觉有点慌呀 这不前两天 我刚提了台问界M9嘛 但是昨晚看小米汽车发布会 是真的被雷总感染到了 真的没忍住 我又冲了台小米SU7 Pro版 本来我是准备抢创始版的 结果1秒钟时间 点进去就…

yolov5 v7.0打包exe文件,使用C++调用

cd到yolo5文件夹下 pyinstaller -p 当前路径 -i logo图标 detect.py问题汇总 运行detect.exe找不到default.yaml 这个是yolov8里的文件 1 复制权重文件到exe所在目录。 2 根据报错提示的配置文件路径&#xff0c;把default.yaml复制放到相应的路径下。&#xff08;缺少相应…

杨辉三角形(c++实现)

题目 下面的图形是著名的杨辉三角形&#xff1a; 如果我们按从上到下、从左到右的顺序把所有数排成一列&#xff0c;可以得到如下数列&#xff1a; 1, 1, 1, 1, 2, 1, 1, 3, 3, 1, 1, 4, 6, 4, 1, … 给定一个正整数 N&#xff0c;请你输出数列中第一次出现 N 是在第几个数&a…

实现 Element UI el-table 树形数据的懒加载

当面对大量数据时&#xff0c;一次性加载所有数据可能会导致性能问题。为了解决这一问题&#xff0c;我们可以实现树形数据的懒加载。本文将介绍如何在使用 Element UI 的 Vue 应用中为 el-table 组件的树形数据添加懒加载功能。 懒加载的基本概念 懒加载是一种优化网页或应用…

中国31个省农村用电量(2000-2022年)

数据介绍&#xff1a; 农村用电量是一个动态变化的数据&#xff0c;受到多种因素的影响&#xff0c;包括农村经济发展、人口增长、农业生产活动增加以及电力设备的升级改造等。随着农村经济的发展和农民生活水平的提高&#xff0c;农村用电量呈现出逐年增长的趋势。同时&#…

消息中间件区别

ActiveMQ 我们先看ActiveMQ。其实一般早些的项目需要引入消息中间件&#xff0c;都是使用的这个MQ&#xff0c;但是现在用的确实不多了&#xff0c;说白了就是有些过时了。我们去它的官网看一看&#xff0c;你会发现官网已经不活跃了&#xff0c;好久才会更新一次。 它的单机吞…

查找某数据在单链表中出现的次数

#define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<stdlib.h> typedef int ElemType; typedef struct LinkNode {ElemType data;LinkNode* next; }LinkNode, * LinkList; //尾插法建立单链表 void creatLinkList(LinkList& L) {L (LinkNode*)mallo…

设计模式之解释器模式的魅力:让代码读懂你的语言

目录 一、什么是解释器模式 二、解释器模式的应用场景 三、解释器模式的优缺点 3.1. 优点 3.2. 缺点 四、解释器模式示例 4.1. 问题描述 4.2. 问题分析 4.3. 代码实现 4.4. 优化方向 五、总结 一、什么是解释器模式 解释器模式&#xff08;Interpreter pattern&…

kubernetes(K8S)学习(七):K8S之系统核心组件

K8S之系统核心组件 K8s系统核心组件1.1 Master和Node1.2 kubeadm1.3 先把核心组件总体过一遍1.4 Kubernetes源码查看方式1.5 kubectl1.6 API Server1.7 集群安全机制之API Server1.8 Scheduler1.9 kubelet1.10 kube-proxy K8s系统核心组件 1.1 Master和Node 官网 &#xff1a;…

360奇酷刷机 360刷机助手 QGDP360手机QGDP刷机

360奇酷刷机 360刷机助手 QGDP破解版360手机QGDP刷机 360手机刷机资源下载链接&#xff1a;360rom.github.io 参考&#xff1a;360手机-360刷机360刷机包twrp、root 360奇酷刷机&#xff1a;360高通驱动安装 360手机刷机驱动&#xff1b;手机内置&#xff0c;可通过USB文件传输…

搜索与图论——染色法判定二分图

一个图是二分图当且仅当这个图中不含奇数环 由于图中没有奇数环&#xff0c;所以染色过程中一定没有矛盾 所以一个二分图一定可以成功被二染色&#xff0c;反之在二染色的过程中出现矛盾的图中一定有奇数环&#xff0c;也就一定不是二分图 #include<iostream> #includ…

【c++初阶】类与对象(中)

✅✅✅✅✅✅✅✅✅✅✅✅✅✅✅✅ ✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨ &#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1…

Transformer论文阅读

Transformer论文阅读 摘要结论1 Introduction &#xff08;导言&#xff09;2 Background3 Model Architecture3.1 Encoder and Decoder StacksEncoderLayer NormDecoder 3.2 Attention3.2.1 Scaled Dot-Product Attention3.2.2 Scaled Dot-Product Attention3.2.3 Application…

4月1日起,未备案App小程序将下架

关注卢松松&#xff0c;会经常给你分享一些我的经验和观点。 最后2天了、最后2天了。 从2024年4月1日起&#xff0c;工信部要求所有的APP、小程序都要备案&#xff0c;否则下架、关停、限制更新。这是去年8月份出的新规&#xff0c;没想到十个月这么快就过去了。 现在广东省…

深入解析大语言模型显存占用:训练与推理

深入解析大语言模型显存占用&#xff1a;训练与推理 文章脉络 估算模型保存大小 估算模型在训练时占用显存的大小 全量参数训练 PEFT训练 估算模型在推理时占用显存的大小 总结 对于NLP领域的从业者和研究人员来说&#xff0c;有没有遇到过这样一个场景&#xff0c;你的…

某东推荐的十大3C热榜第一名!2024随身wifi靠谱品牌推荐!2024随身wifi怎么选?

一、鼠标金榜&#xff1a;戴尔 商务办公有线鼠标 售价:19.9&#xffe5; 50万人好评 二、平板电脑金榜&#xff1a;Apple iPod 10.2英寸 售价:2939&#xffe5; 200万人好评 三、随身WiFi金榜&#xff1a;格行随身WiFi 售价:69&#xffe5; 15万人好评 四、游戏本金榜&#xff…

Gromacs模拟一:配体-双链蛋白质复合物体系准备

1、蛋白质的准备&#xff1a; 在RCSB网站下载想要的蛋白晶体&#xff08;教程里是3htb&#xff09;&#xff0c;用notepad等编辑器或是分子可视化软件除去里面的非蛋白分子或离子。 这里采用的是一个经过分子对接后的蛋白质pdb和配体小分子的pdb。 教程里提到的配体是2-丙基…

【Java多线程】5——Lock底层原理

5 Lock底层原理 ⭐⭐⭐⭐⭐⭐ Github主页&#x1f449;https://github.com/A-BigTree 笔记仓库&#x1f449;https://github.com/A-BigTree/tree-learning-notes 个人主页&#x1f449;https://www.abigtree.top ⭐⭐⭐⭐⭐⭐ 如果可以&#xff0c;麻烦各位看官顺手点个star~&…

新能源汽车充电桩常见类型及充电桩站场的智能监管方案

随着新能源汽车市场的迅猛发展&#xff0c;充电桩作为支持其运行的基础设施&#xff0c;也呈现出多样化的类型。这些充电桩不仅在外形和功能上存在差异&#xff0c;更在充电速度、充电方式以及使用场景等方面展现出独特的优势。 一、充电桩类型及区别 1、慢充桩&#xff08;交…

Go 之 Gin 框架

Gin 是一个 Go (Golang) 编写的轻量级 web 框架&#xff0c;运行速度非常快&#xff0c;擅长 Api 接口的高并发&#xff0c;如果项目的规模不大&#xff0c;业务相对简单&#xff0c;这个时候我们也推荐您使用 Gin&#xff0c;特别适合微服务框架。 简单路由配置 package mai…