使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南

使用Flink实现Kafka到MySQL的数据流转换

在现代数据处理架构中,Kafka和MySQL是两种非常流行的技术。Kafka作为一个高吞吐量的分布式消息系统,常用于构建实时数据流管道。而MySQL则是广泛使用的关系型数据库,适用于存储和查询数据。在某些场景下,我们需要将Kafka中的数据实时地写入到MySQL数据库中,本文将介绍如何使用Apache Flink来实现这一过程。

在这里插入图片描述

环境准备

在开始之前,请确保你的开发环境中已经安装并配置了以下组件:
Apache Flink 准备相关pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>EastMoney</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.14.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency></dependencies></project>

Kafka消息队列

1. 启动zookeeperzkServer start
2. 启动kafka服务kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 创建topickafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
6. 生产数据kafka-console-producer --broker-list localhost:9092 --topic east_money

MySQL数据库
初始化mysql表

CREATE TABLE `t_stock_code_price` (`id` bigint NOT NULL AUTO_INCREMENT,`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',`name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',`close` double DEFAULT NULL COMMENT '最新价',`change_percent` double DEFAULT NULL COMMENT '涨跌幅',`change` double DEFAULT NULL COMMENT '涨跌额',`volume` double DEFAULT NULL COMMENT '成交量(手)',`amount` double DEFAULT NULL COMMENT '成交额',`amplitude` double DEFAULT NULL COMMENT '振幅',`turnover_rate` double DEFAULT NULL COMMENT '换手率',`peration` double DEFAULT NULL COMMENT '市盈率',`volume_rate` double DEFAULT NULL COMMENT '量比',`hign` double DEFAULT NULL COMMENT '最高',`low` double DEFAULT NULL COMMENT '最低',`open` double DEFAULT NULL COMMENT '今开',`previous_close` double DEFAULT NULL COMMENT '昨收',`pb` double DEFAULT NULL COMMENT '市净率',`create_time` varchar(64) NOT NULL COMMENT '写入时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

步骤解释

获取流执行环境:首先,我们通过StreamExecutionEnvironment.getExecutionEnvironment获取Flink的流执行环境,并设置其运行模式为流处理模式。

创建流表环境:接着,我们通过StreamTableEnvironment.create创建一个流表环境,这个环境允许我们使用SQL语句来操作数据流。

val senv = StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)

定义Kafka数据源表:我们使用一个SQL语句创建了一个Kafka表re_stock_code_price_kafka,这个表代表了我们要从Kafka读取的数据结构和连接信息。

tEnv.executeSql("CREATE TABLE re_stock_code_price_kafka (" +"`id` BIGINT," +"`code` STRING," +"`name` STRING," +"`close` DOUBLE NULL," +"`change_percent` DOUBLE," +"`change` DOUBLE," +"`volume` DOUBLE," +"`amount` DOUBLE," +"`amplitude` DOUBLE," +"`turnover_rate` DOUBLE," +"`operation` DOUBLE," +"`volume_rate` DOUBLE," +"`high` DOUBLE ," +"`low` DOUBLE," +"`open` DOUBLE," +"`previous_close` DOUBLE," +"`pb` DOUBLE," +"`create_time` STRING," +"rise int"+") WITH (" +"'connector' = 'kafka'," +"'topic' = 'east_money'," +"'properties.bootstrap.servers' = '127.0.0.1:9092'," +"'properties.group.id' = 'mysql2kafka'," +"'scan.startup.mode' = 'earliest-offset'," +"'format' = 'csv'," +"'csv.field-delimiter' = ','" +")")val result = tEnv.executeSql("select * from re_stock_code_price_kafka")

定义MySQL目标表:然后,我们定义了一个MySQL表re_stock_code_price,指定了与MySQL的连接参数和表结构。

val sink_table: String ="""|CREATE TEMPORARY TABLE re_stock_code_price (|  id BIGINT NOT NULL,|  code STRING NOT NULL,|  name STRING NOT NULL,|  `close` DOUBLE,|  change_percent DOUBLE,|  change DOUBLE,|  volume DOUBLE,|  amount DOUBLE,|  amplitude DOUBLE,|  turnover_rate DOUBLE,|  peration DOUBLE,|  volume_rate DOUBLE,|  hign DOUBLE,|  low DOUBLE,|  `open` DOUBLE,|  previous_close DOUBLE,|  pb DOUBLE,|  create_time STRING NOT NULL,|  rise int,|  PRIMARY KEY (id) NOT ENFORCED|) WITH (|   'connector' = 'jdbc',|   'url' = 'jdbc:mysql://localhost:3306/mydb',|   'driver' = 'com.mysql.cj.jdbc.Driver',|   'table-name' = 're_stock_code_price',|   'username' = 'root',|   'password' = '12345678'|)|""".stripMargintEnv.executeSql(sink_table)

数据转换和写入:最后,我们执行了一个插入操作,将从Kafka读取的数据转换并写入到MySQL中。

tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")result.print()

全部代码

package org.eastimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Kafka2Mysql {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)tEnv.executeSql("CREATE TABLE re_stock_code_price_kafka (" +"`id` BIGINT," +"`code` STRING," +"`name` STRING," +"`close` DOUBLE NULL," +"`change_percent` DOUBLE," +"`change` DOUBLE," +"`volume` DOUBLE," +"`amount` DOUBLE," +"`amplitude` DOUBLE," +"`turnover_rate` DOUBLE," +"`operation` DOUBLE," +"`volume_rate` DOUBLE," +"`high` DOUBLE ," +"`low` DOUBLE," +"`open` DOUBLE," +"`previous_close` DOUBLE," +"`pb` DOUBLE," +"`create_time` STRING," +"rise int"+") WITH (" +"'connector' = 'kafka'," +"'topic' = 'east_money'," +"'properties.bootstrap.servers' = '127.0.0.1:9092'," +"'properties.group.id' = 'mysql2kafka'," +"'scan.startup.mode' = 'earliest-offset'," +"'format' = 'csv'," +"'csv.field-delimiter' = ','" +")")val result = tEnv.executeSql("select * from re_stock_code_price_kafka")val sink_table: String ="""|CREATE TEMPORARY TABLE re_stock_code_price (|  id BIGINT NOT NULL,|  code STRING NOT NULL,|  name STRING NOT NULL,|  `close` DOUBLE,|  change_percent DOUBLE,|  change DOUBLE,|  volume DOUBLE,|  amount DOUBLE,|  amplitude DOUBLE,|  turnover_rate DOUBLE,|  peration DOUBLE,|  volume_rate DOUBLE,|  hign DOUBLE,|  low DOUBLE,|  `open` DOUBLE,|  previous_close DOUBLE,|  pb DOUBLE,|  create_time STRING NOT NULL,|  rise int,|  PRIMARY KEY (id) NOT ENFORCED|) WITH (|   'connector' = 'jdbc',|   'url' = 'jdbc:mysql://localhost:3306/mydb',|   'driver' = 'com.mysql.cj.jdbc.Driver',|   'table-name' = 're_stock_code_price',|   'username' = 'root',|   'password' = '12345678'|)|""".stripMargintEnv.executeSql(sink_table)tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")result.print()print("数据打印完成!!!")}
}

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/781935.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小米SU7 我劝你再等等

文 | AUTO芯球 作者 | 李逵 我必须承认我一时没忍住 犯错了 我不会被我老婆打吧 感觉有点慌呀 这不前两天 我刚提了台问界M9嘛 但是昨晚看小米汽车发布会 是真的被雷总感染到了 真的没忍住 我又冲了台小米SU7 Pro版 本来我是准备抢创始版的 结果1秒钟时间 点进去就…

yolov5 v7.0打包exe文件,使用C++调用

cd到yolo5文件夹下 pyinstaller -p 当前路径 -i logo图标 detect.py问题汇总 运行detect.exe找不到default.yaml 这个是yolov8里的文件 1 复制权重文件到exe所在目录。 2 根据报错提示的配置文件路径&#xff0c;把default.yaml复制放到相应的路径下。&#xff08;缺少相应…

杨辉三角形(c++实现)

题目 下面的图形是著名的杨辉三角形&#xff1a; 如果我们按从上到下、从左到右的顺序把所有数排成一列&#xff0c;可以得到如下数列&#xff1a; 1, 1, 1, 1, 2, 1, 1, 3, 3, 1, 1, 4, 6, 4, 1, … 给定一个正整数 N&#xff0c;请你输出数列中第一次出现 N 是在第几个数&a…

实现 Element UI el-table 树形数据的懒加载

当面对大量数据时&#xff0c;一次性加载所有数据可能会导致性能问题。为了解决这一问题&#xff0c;我们可以实现树形数据的懒加载。本文将介绍如何在使用 Element UI 的 Vue 应用中为 el-table 组件的树形数据添加懒加载功能。 懒加载的基本概念 懒加载是一种优化网页或应用…

中国31个省农村用电量(2000-2022年)

数据介绍&#xff1a; 农村用电量是一个动态变化的数据&#xff0c;受到多种因素的影响&#xff0c;包括农村经济发展、人口增长、农业生产活动增加以及电力设备的升级改造等。随着农村经济的发展和农民生活水平的提高&#xff0c;农村用电量呈现出逐年增长的趋势。同时&#…

收获的果实将远远超出你的想象

在追求目标的过程中&#xff0c;刚开始的时候努力所带来的收获可能显得微不足道。然而&#xff0c;随着时间的推移&#xff0c;你会逐渐发现收获的增长是远远超过最初付出的努力的。这就像种下一颗种子&#xff0c;一开始你可能需要付出很多精力去浇灌它&#xff0c;但随着时间…

消息中间件区别

ActiveMQ 我们先看ActiveMQ。其实一般早些的项目需要引入消息中间件&#xff0c;都是使用的这个MQ&#xff0c;但是现在用的确实不多了&#xff0c;说白了就是有些过时了。我们去它的官网看一看&#xff0c;你会发现官网已经不活跃了&#xff0c;好久才会更新一次。 它的单机吞…

LeetCode 2908.元素和最小的山形三元组 I:贪心(两次遍历)——双O(n)复杂度

【LetMeFly】2908.元素和最小的山形三元组 I&#xff1a;贪心&#xff08;两次遍历&#xff09;——双O(n)复杂度 力扣题目链接&#xff1a;https://leetcode.cn/problems/minimum-sum-of-mountain-triplets-i/ 给你一个下标从 0 开始的整数数组 nums 。 如果下标三元组 (i,…

查找某数据在单链表中出现的次数

#define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<stdlib.h> typedef int ElemType; typedef struct LinkNode {ElemType data;LinkNode* next; }LinkNode, * LinkList; //尾插法建立单链表 void creatLinkList(LinkList& L) {L (LinkNode*)mallo…

JAVA反射基础篇

简介 Java反射是指在运行时动态地获取类的信息&#xff0c;并可以通过该信息来操作类或对象。通过反射&#xff0c;我们可以在运行时获取类的字段、方法、构造函数等信息&#xff0c;并能够动态地创建对象、调用方法、访问和修改字段的值。 一丶概念 反射是Java语言的一种机…

2024.2.5力扣每日一题——跳跃游戏6

2024.2.5 题目来源我的题解方法一 深度搜索实现方法二 动态规划双端队列 题目来源 力扣每日一题&#xff1b;题序&#xff1a;1696 我的题解 方法一 深度搜索实现 使用深度搜索&#xff0c;每次选择一个在范围内的跳跃点&#xff0c;但是时间通不过。 时间复杂度&#xff1a…

Kafka配置与部署CentOS7[]

静态IP设置 # 修改网卡配置文件 vim /etc/sysconfig/network-scripts/ifcfg-ens33# 修改文件内容 TYPEEthernet PROXY_METHODnone BROWSER_ONLYno BOOTPROTOstatic IPADDR192.168.18.128 NETMASK255.255.255.0 GATEWAY192.168.18.2 DEFROUTEyes IPV4_FAILURE_FATALno IPV6INIT…

设计模式之解释器模式的魅力:让代码读懂你的语言

目录 一、什么是解释器模式 二、解释器模式的应用场景 三、解释器模式的优缺点 3.1. 优点 3.2. 缺点 四、解释器模式示例 4.1. 问题描述 4.2. 问题分析 4.3. 代码实现 4.4. 优化方向 五、总结 一、什么是解释器模式 解释器模式&#xff08;Interpreter pattern&…

kubernetes(K8S)学习(七):K8S之系统核心组件

K8S之系统核心组件 K8s系统核心组件1.1 Master和Node1.2 kubeadm1.3 先把核心组件总体过一遍1.4 Kubernetes源码查看方式1.5 kubectl1.6 API Server1.7 集群安全机制之API Server1.8 Scheduler1.9 kubelet1.10 kube-proxy K8s系统核心组件 1.1 Master和Node 官网 &#xff1a;…

360奇酷刷机 360刷机助手 QGDP360手机QGDP刷机

360奇酷刷机 360刷机助手 QGDP破解版360手机QGDP刷机 360手机刷机资源下载链接&#xff1a;360rom.github.io 参考&#xff1a;360手机-360刷机360刷机包twrp、root 360奇酷刷机&#xff1a;360高通驱动安装 360手机刷机驱动&#xff1b;手机内置&#xff0c;可通过USB文件传输…

搜索与图论——染色法判定二分图

一个图是二分图当且仅当这个图中不含奇数环 由于图中没有奇数环&#xff0c;所以染色过程中一定没有矛盾 所以一个二分图一定可以成功被二染色&#xff0c;反之在二染色的过程中出现矛盾的图中一定有奇数环&#xff0c;也就一定不是二分图 #include<iostream> #includ…

【c++初阶】类与对象(中)

✅✅✅✅✅✅✅✅✅✅✅✅✅✅✅✅ ✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨ &#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1…

2024.3.30力扣刷题记录-二叉树学习记录2(未完)

一、学习视频 如何灵活运用递归&#xff1f;【基础算法精讲 10】_哔哩哔哩_bilibili 二、跟练代码 1. 100. 相同的树 递归 # Definition for a binary tree node. # class TreeNode: # def __init__(self, val0, leftNone, rightNone): # self.val val # …

Transformer论文阅读

Transformer论文阅读 摘要结论1 Introduction &#xff08;导言&#xff09;2 Background3 Model Architecture3.1 Encoder and Decoder StacksEncoderLayer NormDecoder 3.2 Attention3.2.1 Scaled Dot-Product Attention3.2.2 Scaled Dot-Product Attention3.2.3 Application…

4月1日起,未备案App小程序将下架

关注卢松松&#xff0c;会经常给你分享一些我的经验和观点。 最后2天了、最后2天了。 从2024年4月1日起&#xff0c;工信部要求所有的APP、小程序都要备案&#xff0c;否则下架、关停、限制更新。这是去年8月份出的新规&#xff0c;没想到十个月这么快就过去了。 现在广东省…