【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

  • 1)环境准备
  • 2)准备相关 jar 包
  • 3)实现场景
  • 4)准备工作
    • 4.1.Mysql
    • 4.2.Kafka
  • 5)Flink-Sql
  • 6)验证

1)环境准备

Linux 或者 Windows 端需要安装:Mysql,Kafka,Flink 等。(略)

2)准备相关 jar 包

  • flink-connector-jdbc_2.11-1.12.0.jar
  • mysql-connector-java-5.1.49.jar

下载地址:JDBC-Sql-Connector

在这里插入图片描述

在这里插入图片描述

  • flink-format-changelog-json-1.2.0.jar
  • flink-sql-connector-mysql-cdc-1.2.0.jar
  • flink-sql-connector-postgres-cdc-1.2.0.jar

下载地址:ververica/flink-cdc-connectors

在这里插入图片描述

备用下载地址:gitee地址(github上不去就下载源码,改好version自己打包)

  • flink-sql-connector-kafka_2.11-1.12.0.jar

下载地址:flink-sql-connector-kafka

  • 将下载好的包放在 Flink 的 lib 目录下

3)实现场景

1、首先确认MySQL是否开启binlog机制,log_bin = ON 为开启 (如下图)

在这里插入图片描述

2、如果是本地环境的 Mysql 按照下面方式开启 binlog

在 C:\ProgramData\MySQL\MySQL Server 5.7\my.ini 下添加

log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30

3、重启 Mysql 服务

4)准备工作

4.1.Mysql

1、在 Mysql 中创建 source 表:

CREATE TABLE `mysql2kafka_cdc_test` (`id` int(11) NOT NULL AUTO_INCREMENT,`eventId` varchar(255) DEFAULT NULL,`eventStDt` varchar(255) DEFAULT NULL,`bak6` varchar(255) DEFAULT NULL,`bak7` varchar(255) DEFAULT NULL,`businessId` varchar(255) DEFAULT NULL,`phone` varchar(255) DEFAULT NULL,`bak1` varchar(255) DEFAULT NULL,`bak2` varchar(255) DEFAULT NULL,`bak13` varchar(255) DEFAULT NULL,`bak14` varchar(255) DEFAULT NULL,`bak11` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8

2、写入数据的语句准备就绪

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);

4.2.Kafka

创建 Topic

5)Flink-Sql

  • source
set table.dynamic-table-options.enabled=true;
set table.exec.source.cdc-events-duplicate=true;CREATE TABLE source_mysql_test(id INT,eventId STRING,eventStDt STRING,bak6 STRING,bak7 STRING,businessId STRING,phone STRING,bak1 STRING,bak2 STRING,bak13 STRING,bak14 STRING,bak11 STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH('connector' = 'mysql-cdc','hostname' = '${ip}','port' = '${port}','database-name' = 'test','table-name' = 'mysql2kafka_cdc_test','username' = '${username}','password' = '${password}','scan.startup.mode'='timestamp','scan.startup.timestamp-millis' = '1692115200000'
);
  • sink
CREATE TABLE sink_kafka_test (id INT,eventId STRING,eventStDt STRING,bak6 STRING,bak7 STRING,businessId STRING,phone STRING,bak1 STRING,bak2 STRING,bak13 STRING,bak14 STRING,bak11 STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'upsert-kafka','topic' = 'test','sink.parallelism' = '3','key.format' = 'json','value.format' = 'json','properties.bootstrap.servers' = '${kafka-bootstrap-server}','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.kerberos.service.name' = 'kafka','metadata.max.age.ms' = '300000'
);
  • insert
insert into sink_kafka_test select * from source_mysql_test;

6)验证

Mysql 中写入测试数据,Kafka-Topic 中观察是否有数据生成。

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/233288.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VuePress安装及使用——使用 Markdown 创建你自己的博客网站和电子书

目录 前言一、依赖环境二、vuepress 安装和使用1.初始化2.将 VuePress 安装为本地依赖3. package.json 中添加脚本4. 新建 docs 文件夹5.启动6. 效果 三、进阶使用1.新增配置文件2.安装搜索插件3.config.js 中增加配置4.效果展示5.注意 四、使用主题1.安装2. 目录结构说明&…

Hadoop3.x完全分布式模式下slaveDataNode节点未启动调整

目录 前言 一、问题重现 1、查询Hadoop版本 2、集群启动Hadoop 二、问题分析 三、Hadoop3.x的集群配置 1、停止Hadoop服务 2、配置workers 3、从节点检测 4、WebUI监控 总结 前言 在大数据的世界里,Hadoop绝对是一个值得学习的框架。关于Hadoop的知识&…

git修改远程commit信息

git 修改远程commit信息 如果你已经把本地commit的信息push到远程了,此时需要修改远程中的commit信息 第一步:git log 查看提交的信息,看下提交的commit日志 如下入所示 第二步:然后确定你需要修改的那一次commit,比如&#xf…

简单的绑定发布事件

在绑定事情之前,我们需要对我们的需求进行分析;判断我们是否需要同时存在条件。 发布动态的时候,分为以下三种情况: ① 输入了标题,没有图片,可以发布动态 ②输入了图片,没有标题,…

清空缓存区的方法

fflush(文件指针) fflush()用于刷新相应文件的缓存区。 使用getchar()函数来清空标准输入缓存区 上面的fflush是一个函数,有些编译器不一定支持,这时候我们可以自己实现清空标准输入缓存区的操作。 代码示例: 使用scanf()的高级特性来清空标准输入缓存区 上面代码的意思是: …

GO 的 socks5代理 编写

这里学习一下 socks5 代理的编写 网上有很多 学习一下 go 语言实战入门案例之实现Socks5 - 知乎 滑动验证页面 socks5协议原理学习-腾讯云开发者社区-腾讯云 (tencent.com) 首先我们要了解一下socks5的代理方式 socks5 是基于 认证建立连接转发数据 所形成的代理 我们只…

LLMs推理框架总结

总结一下这些框架的特点,如下表所示: LLM推理有很多框架,各有其特点,下面分别介绍一下表中七个框架的关键点: vLLM:适用于大批量Prompt输入,并对推理速度要求高的场景;Text generat…

C语言-> 文件操作(函数满屏)

系列文章目录 前言 ✅作者简介:大家好,我是橘橙黄又青,一个想要与大家共同进步的男人😉😉 🍎个人主页:橘橙黄又青_C语言,数据结构,函数-CSDN博客 目的:学习文件操作,即…

林杰:程序员依然是草根跨越阶级的最佳途径之一 | 程客有话说

《程客有话说》是我们最新推出的一个访谈栏目,邀请一些国内外有趣的程序员来分享他们的经验、观点与成长故事,尝试建立一个程序员交流与学习的平台,也欢迎大家推荐朋友或自己来参加我们的节目,一起加油。 本期我们邀请的程序员林…

2023-12-19 二叉搜索树的最小绝对差和二叉搜索树的众数和二叉树的最近公共祖先

二叉搜索树的最小绝对差 关键信息:二叉搜索树表明了树有序的!遇到在二叉搜索树上求什么最值啊,差值之类的,就把它想成在一个有序数组上求最值,求差值 # Definition for a binary tree node. # class TreeNode: # …

微信私域管理工具如何帮助企业提升销售业绩?

现如今,微信已经从社交通讯软件,慢慢被默认为常规办公软件,工作沟通、业务洽谈、网络会议等都在微信上进行,完全变成职场首选的社交工具。 但受限于微信平台,许多公司在微信私域营销方面面临诸多挑战。 微信私域管理工…

调用第三方http接口 hutool工具类

1、引入依赖 <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.0.M2</version> </dependency>2、请求组装 String params"<BSXml>" " <MsgHeader>&…

Leetcode—16.最接近的三数之和【中等】

2023每日刷题&#xff08;六十四&#xff09; Leetcode—16.最接近的三数之和 实现代码 class Solution { public:int threeSumClosest(vector<int>& nums, int target) {sort(nums.begin(), nums.end());int s 0;int diff INT_MAX / 2;int n nums.size();int a…

光伏收益计算工具:助力可再生能源发展的关键

随着全球对可再生能源需求的不断增加&#xff0c;光伏发电作为清洁、可再生的能源形式&#xff0c;越来越受到人们的关注。然而&#xff0c;要评估光伏系统的经济效益和投资回报&#xff0c;需要一个准确的光伏收益计算工具。 光伏收益计算工具是一种专门用于计算光伏系统发电量…

Ridge Lasso Regression解决线性回归的过拟合(Overfitting)(基于波士顿房价预测)

目录 介绍&#xff1a; 1、过拟合 2、Lasso regression 3、 Ridge regression 4、 Lasso regression 和 Ridge regression一定优于LinearRegression吗 一、 Linear Regression 二、Ridge Regression 三、Lasso Regression 四、Ridge Regression和Lasso Regression 五、…

Linux常用命令详解

文章目录 Linux常用命令详解一、Shell&#xff08;执行的任务—翻译&#xff09;二、Linux命令1、Linux命令的分类内部命令与外部命令的区别命令执行过程 2、Linux命令行的格式通用的命令行使用格式 3、编辑Linux命令行的辅助操作4、获得命令帮助的办法内部命令help命令的“--h…

100GPTS计划-AI写作VersatileWriter

地址 https://chat.openai.com/g/g-zHErU9z9m-versatile-writer https://poe.com/VersatileWriterGPT 测试 翻译:要求将给定的英语语句翻译成中文。 总结:给出一段文本,要求进行概括和总结。 问答:根据给定段落,提出相关问题并给出答案。 推理:给出前提,进行多步推理并得…

文件包含的提升刷题

上一篇文章&#xff1a;一篇文章带你入门文件包含-CSDN博客 已经开始入门了文件包含&#xff0c;那现在开始拔高提升刷题&#xff01; 1. 拿到题目后啥也没有&#xff0c;所以也不知道要读取啥文件&#xff0c;那就查看源代码。 直接看if的条件就可以知道一定要设置cookie&a…

【linux】(ubuntu)下 QT 出现的问题

错误一&#xff1a;Make 运行QT程序以后出现这样的错误。 【解决方法】 我的ubuntu版本是18.04.4&#xff0c; 原因1&#xff1a;没有更换软件源 原因2&#xff1a;没安装相关 软件包 注意&#xff1a;这一步很有可能卡死这一步&#xff0c;所以如果一直卡在这并且进度…

第三讲GNSS相关时间系统和转换 第四讲观测值的产生和分类 | GNSS(RTK)课程学习笔记day2

说明&#xff1a;以下笔记来自计算机视觉life吴桐老师课程&#xff1a;从零掌握GNSS、RTK定位[链接]&#xff0c;从零掌握RTKLIB[链接]。非原创&#xff01;且笔记仅供自身与大家学习使用&#xff0c;无利益目的。 第三讲 GNSS相关时间系统和转换 GPS卫星的位置在时间过程中是…