【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

  • 1)环境准备
  • 2)准备相关 jar 包
  • 3)实现场景
  • 4)准备工作
    • 4.1.Mysql
    • 4.2.Kafka
  • 5)Flink-Sql
  • 6)验证

1)环境准备

Linux 或者 Windows 端需要安装:Mysql,Kafka,Flink 等。(略)

2)准备相关 jar 包

  • flink-connector-jdbc_2.11-1.12.0.jar
  • mysql-connector-java-5.1.49.jar

下载地址:JDBC-Sql-Connector

在这里插入图片描述

在这里插入图片描述

  • flink-format-changelog-json-1.2.0.jar
  • flink-sql-connector-mysql-cdc-1.2.0.jar
  • flink-sql-connector-postgres-cdc-1.2.0.jar

下载地址:ververica/flink-cdc-connectors

在这里插入图片描述

备用下载地址:gitee地址(github上不去就下载源码,改好version自己打包)

  • flink-sql-connector-kafka_2.11-1.12.0.jar

下载地址:flink-sql-connector-kafka

  • 将下载好的包放在 Flink 的 lib 目录下

3)实现场景

1、首先确认MySQL是否开启binlog机制,log_bin = ON 为开启 (如下图)

在这里插入图片描述

2、如果是本地环境的 Mysql 按照下面方式开启 binlog

在 C:\ProgramData\MySQL\MySQL Server 5.7\my.ini 下添加

log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30

3、重启 Mysql 服务

4)准备工作

4.1.Mysql

1、在 Mysql 中创建 source 表:

CREATE TABLE `mysql2kafka_cdc_test` (`id` int(11) NOT NULL AUTO_INCREMENT,`eventId` varchar(255) DEFAULT NULL,`eventStDt` varchar(255) DEFAULT NULL,`bak6` varchar(255) DEFAULT NULL,`bak7` varchar(255) DEFAULT NULL,`businessId` varchar(255) DEFAULT NULL,`phone` varchar(255) DEFAULT NULL,`bak1` varchar(255) DEFAULT NULL,`bak2` varchar(255) DEFAULT NULL,`bak13` varchar(255) DEFAULT NULL,`bak14` varchar(255) DEFAULT NULL,`bak11` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8

2、写入数据的语句准备就绪

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);

4.2.Kafka

创建 Topic

5)Flink-Sql

  • source
set table.dynamic-table-options.enabled=true;
set table.exec.source.cdc-events-duplicate=true;CREATE TABLE source_mysql_test(id INT,eventId STRING,eventStDt STRING,bak6 STRING,bak7 STRING,businessId STRING,phone STRING,bak1 STRING,bak2 STRING,bak13 STRING,bak14 STRING,bak11 STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH('connector' = 'mysql-cdc','hostname' = '${ip}','port' = '${port}','database-name' = 'test','table-name' = 'mysql2kafka_cdc_test','username' = '${username}','password' = '${password}','scan.startup.mode'='timestamp','scan.startup.timestamp-millis' = '1692115200000'
);
  • sink
CREATE TABLE sink_kafka_test (id INT,eventId STRING,eventStDt STRING,bak6 STRING,bak7 STRING,businessId STRING,phone STRING,bak1 STRING,bak2 STRING,bak13 STRING,bak14 STRING,bak11 STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'upsert-kafka','topic' = 'test','sink.parallelism' = '3','key.format' = 'json','value.format' = 'json','properties.bootstrap.servers' = '${kafka-bootstrap-server}','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.kerberos.service.name' = 'kafka','metadata.max.age.ms' = '300000'
);
  • insert
insert into sink_kafka_test select * from source_mysql_test;

6)验证

Mysql 中写入测试数据,Kafka-Topic 中观察是否有数据生成。

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/233288.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java:Lambda函数式编程

作为一个前端同学,今天惊叹的发现,原来 java 也可以使用函数式编程,赶紧来看看怎么玩 简介 当Java 8引入Lambda表达式和函数式编程特性时,它开启了一个全新的编程范式。这些功能使得Java语言更加现代、灵活,并支持更…

VuePress安装及使用——使用 Markdown 创建你自己的博客网站和电子书

目录 前言一、依赖环境二、vuepress 安装和使用1.初始化2.将 VuePress 安装为本地依赖3. package.json 中添加脚本4. 新建 docs 文件夹5.启动6. 效果 三、进阶使用1.新增配置文件2.安装搜索插件3.config.js 中增加配置4.效果展示5.注意 四、使用主题1.安装2. 目录结构说明&…

MeterSphere 和 Yapi 接口测试功能对比

一、产品介绍 MeterSphere 是一站式开源持续测试平台,涵盖测试跟踪、接口测试、UI 测试和性能测试等,全面兼容 JMeter、Selenium 等主流开源标准,有效助力开发和测试团队在线共享协作,实现端到端的测试管理跟踪可视化、自动化测试…

Hadoop3.x完全分布式模式下slaveDataNode节点未启动调整

目录 前言 一、问题重现 1、查询Hadoop版本 2、集群启动Hadoop 二、问题分析 三、Hadoop3.x的集群配置 1、停止Hadoop服务 2、配置workers 3、从节点检测 4、WebUI监控 总结 前言 在大数据的世界里,Hadoop绝对是一个值得学习的框架。关于Hadoop的知识&…

git修改远程commit信息

git 修改远程commit信息 如果你已经把本地commit的信息push到远程了,此时需要修改远程中的commit信息 第一步:git log 查看提交的信息,看下提交的commit日志 如下入所示 第二步:然后确定你需要修改的那一次commit,比如&#xf…

运营Instagram的实用必备工具分享!

Instagram作为目前全球最受欢迎的社交媒体平台之一,已经成为品牌推广和营销的重要渠道。然而,要在Instagram上成功运营并吸引更多的关注者和用户参与,需要借助一些实用的工具来提升效率和效果。在本篇博客文章中,我们将分享10个运…

简单的绑定发布事件

在绑定事情之前,我们需要对我们的需求进行分析;判断我们是否需要同时存在条件。 发布动态的时候,分为以下三种情况: ① 输入了标题,没有图片,可以发布动态 ②输入了图片,没有标题,…

清空缓存区的方法

fflush(文件指针) fflush()用于刷新相应文件的缓存区。 使用getchar()函数来清空标准输入缓存区 上面的fflush是一个函数,有些编译器不一定支持,这时候我们可以自己实现清空标准输入缓存区的操作。 代码示例: 使用scanf()的高级特性来清空标准输入缓存区 上面代码的意思是: …

GO 的 socks5代理 编写

这里学习一下 socks5 代理的编写 网上有很多 学习一下 go 语言实战入门案例之实现Socks5 - 知乎 滑动验证页面 socks5协议原理学习-腾讯云开发者社区-腾讯云 (tencent.com) 首先我们要了解一下socks5的代理方式 socks5 是基于 认证建立连接转发数据 所形成的代理 我们只…

Wireshark统计和可视化

第一章:Wireshark基础及捕获技巧 1.1 Wireshark基础知识回顾 1.2 高级捕获技巧:过滤器和捕获选项 1.3 Wireshark与其他抓包工具的比较 第二章:网络协议分析 2.1 网络协议分析:TCP、UDP、ICMP等 2.2 高级协议分析:HTTP…

LLMs推理框架总结

总结一下这些框架的特点,如下表所示: LLM推理有很多框架,各有其特点,下面分别介绍一下表中七个框架的关键点: vLLM:适用于大批量Prompt输入,并对推理速度要求高的场景;Text generat…

P1011 [NOIP1998 提高组] 车站 ---Java

主打一个枚举找规律... 站台123456...上车ababa2b2a3b3a5b下车bbaba2b2a3b人数aa2a2ab3a2b4a4b 上车这一行: 观察a的系数(假设是dx[i])可知: dx[i] dx[i-1] dx[i-2] b的系数(假设是dy[i])可知: dy[i] dy[i-1] dy[i-2] 人数…

黑豹程序员-vue3获取拖动div后的坐标

<script lang"ts" setup> import {ref, reactive, onMounted} from vuelet startclientX ref(0) // 元素拖拽前距离浏览器的X轴位置 let startclientY ref(0) //元素拖拽前距离浏览器的Y轴位置 let elLeft ref(770) // 元素的左偏移量 let elTop ref(220) …

android的bundle的常用函数(ChatGPT)

在 Android 中&#xff0c;Bundle 是一个用于保存和传递数据的容器类&#xff0c;它被广泛用于在不同组件&#xff08;如 Activity、Fragment&#xff09;之间传递数据。以下是一些 Bundle 中常用的函数&#xff1a; putXXX 系列方法&#xff1a; 用于将各种类型的数据放入 Bun…

C语言-> 文件操作(函数满屏)

系列文章目录 前言 ✅作者简介&#xff1a;大家好&#xff0c;我是橘橙黄又青&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;橘橙黄又青_C语言,数据结构,函数-CSDN博客 目的&#xff1a;学习文件操作&#xff0c;即…

林杰:程序员依然是草根跨越阶级的最佳途径之一 | 程客有话说

《程客有话说》是我们最新推出的一个访谈栏目&#xff0c;邀请一些国内外有趣的程序员来分享他们的经验、观点与成长故事&#xff0c;尝试建立一个程序员交流与学习的平台&#xff0c;也欢迎大家推荐朋友或自己来参加我们的节目&#xff0c;一起加油。 本期我们邀请的程序员林…

2023-12-19 二叉搜索树的最小绝对差和二叉搜索树的众数和二叉树的最近公共祖先

二叉搜索树的最小绝对差 关键信息&#xff1a;二叉搜索树表明了树有序的&#xff01;遇到在二叉搜索树上求什么最值啊&#xff0c;差值之类的&#xff0c;就把它想成在一个有序数组上求最值&#xff0c;求差值 # Definition for a binary tree node. # class TreeNode: # …

微信私域管理工具如何帮助企业提升销售业绩?

现如今&#xff0c;微信已经从社交通讯软件&#xff0c;慢慢被默认为常规办公软件&#xff0c;工作沟通、业务洽谈、网络会议等都在微信上进行&#xff0c;完全变成职场首选的社交工具。 但受限于微信平台&#xff0c;许多公司在微信私域营销方面面临诸多挑战。 微信私域管理工…

调用第三方http接口 hutool工具类

1、引入依赖 <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.0.M2</version> </dependency>2、请求组装 String params"<BSXml>" " <MsgHeader>&…

Leetcode—16.最接近的三数之和【中等】

2023每日刷题&#xff08;六十四&#xff09; Leetcode—16.最接近的三数之和 实现代码 class Solution { public:int threeSumClosest(vector<int>& nums, int target) {sort(nums.begin(), nums.end());int s 0;int diff INT_MAX / 2;int n nums.size();int a…