Flink CDC实时同步mysql数据

官方参考资料:

https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/

Apache Flink 的 Change Data Capture (CDC) 是一种用于捕获数据库变化(如插入、更新和删除操作)的技术。Flink CDC Connector 允许你使用 Flink 从 MySQL 等数据库中读取变化数据,并处理这些流式数据。以下是如何在 Flink 中配置和使用 CDC Connector 读取 MySQL 数据的步骤:

前提条件

  1. MySQL 数据库:确保你已经有一个 MySQL 数据库,并且知道数据库的连接信息(如主机名、端口、用户名、密码、数据库名)。
  2. Flink 环境:你需要在本地或集群上配置好 Flink 环境。
  3. MySQL Binlog:确保 MySQL 数据库启用了 Binlog(Binary Logging),因为 Flink CDC 依赖于 Binlog 来捕获数据变化。

支持的数据库

依赖

Maven dependency

<dependency>

   <groupId>org.apache.flink</groupId>

   <artifactId>flink-connector-mysql-cdc</artifactId>

   <!--  请使用已发布的版本依赖,snapshot 版本的依赖需要本地自行编译。 -->

   <version>3.3-SNAPSHOT</version>

</dependency>

SQL Client JAR

下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。

下载 flink-sql-connector-mysql-cdc-3.3-SNAPSHOT.jar 到 <FLINK_HOME>/lib/ 目录下。

由于 MySQL Connector 采用的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。 您可能需要手动配置以下依赖:

配置 MySQL Binlog

修改MySQL 配置文件(my.cnf或my.ini):

[mysqld]

server-id = 1

log-bin = mysql-bin

binlog-format = ROW

配置 MySQL 服务器

先创建一个 MySQL 用户,并授权。

  1. 创建 MySQL 用户:

mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

  1. 向用户授予所需的权限:

mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

注意: 在 scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。

  1. 刷新用户权限:

mysql> FLUSH PRIVILEGES;

创建 MySQL CDC 表

MySQL CDC 表可以定义如下:

-- 每 3 秒做一次 checkpoint,用于测试,生产配置建议5到10分钟                     

Flink SQL> SET 'execution.checkpointing.interval' = '3s';  

-- 在 Flink SQL中注册 MySQL 表 'orders'

Flink SQL> CREATE TABLE orders (

     order_id INT,

     order_date TIMESTAMP(0),

     customer_name STRING,

     price DECIMAL(10, 5),

     product_id INT,

     order_status BOOLEAN,

     PRIMARY KEY(order_id) NOT ENFORCED

     ) WITH (

     'connector' = 'mysql-cdc',

     'hostname' = 'localhost',

     'port' = '3306',

     'username' = 'root',

     'password' = '123456',

     'database-name' = 'mydb',

     'table-name' = 'orders');

-- 从订单表读取全量数据(快照)和增量数据(binlog)

Flink SQL> SELECT * FROM orders;

关于无主键表

从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。

在使用无主键表时,需要注意以下两种情况。

  1. 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。
  2. 无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定:
  • 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
  • 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/64465.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Odoo:免费开源ERP的AI技术赋能出海企业电子商务应用介绍

概述 伴随电子商务的持续演进&#xff0c;客户对于便利性、速度以及个性化服务的期许急剧攀升。企业务必要探寻创新之途径&#xff0c;以强化自身运营&#xff0c;并优化购物体验。达成此目标的最为行之有效的方式之一&#xff0c;便是将 AI 呼叫助手融入您的电子商务平台。我们…

二、使用langchain搭建RAG:金融问答机器人--数据清洗和切片

选择金融领域的专业文档作为源文件 这里选择 《博金大模型挑战赛-金融千问14b数据集》&#xff0c;这个数据集包含若干公司的年报&#xff0c;我们将利用这个年报搭建金融问答机器人。 具体下载地址 这里 git clone https://www.modelscope.cn/datasets/BJQW14B/bs_challenge_…

maven使用Dependency-Check来扫描安全漏洞

在现代软件开发中&#xff0c;使用开源库和第三方依赖项已成为常态。然而&#xff0c;这些依赖项可能包含已知的安全漏洞&#xff0c;给应用程序带来潜在的风险。为了解决这个问题&#xff0c;OWASP Dependency-Check 应运而生。本文将介绍 OWASP Dependency-Check 的功能、安装…

meta-llama/Llama-3.2-1B 微调记录

踩坑&#xff1a; 1.刚开始部署在自己的windows电脑上&#xff0c;semgrep不支持windows &#xff0c;然后就换了linux服务器 2.服务器没有梯子&#xff0c;huggingface无法访问&#xff0c;模型数据集无法下载 解决方法&#xff1a; 使用huggingface镜像网站下载模型&#xf…

双指针---有效三角形的个数

这里写自定义目录标题 题目链接 [有效三角形的个数](https://leetcode.cn/problems/valid-triangle-number/description/)问题分析代码解决执行用时 题目链接 有效三角形的个数 给定一个包含非负整数的数组 nums &#xff0c;返回其中可以组成三角形三条边的三元组个数。 示例…

《通信电子电路》入门手册

因为大学这门课好多同学理解不了这门课 于是考完试后花了两天时间整理了这份笔记&#xff0c;在这分享给完全没有学懂这门课的同学&#xff0c;也帮助“理解概念才能学得进去”的同学入门 笔记&#xff1a;通信电子电路 入门手册 —— flowus笔记 对应&#xff1a;《通信电子…

基于单片机的智能水表的设计

1总体设计 本次设计智能IC卡水表&#xff0c;在系统架构上设计如图2.1所示&#xff0c;由STM32F103单片机&#xff0c;YF-S401霍尔型传感器&#xff0c;RFID模块&#xff0c;OLED12864液晶,按键&#xff0c;继电器等构成&#xff0c;在功能上可以实现水流量的检测&#xff0c;…

LA2016逻辑分析仪使用笔记1:测量引脚、解析串口数据

今日尝试学习使用LA2016逻辑分析仪&#xff1a;测量引脚 解析串口数据&#xff1a; 目录 逻辑分析仪&#xff1a; 实验接线&#xff1a; 基础操作&#xff1a; 选择使用到的通道&#xff1a; 设置采样时间、采样频率&#xff1a; 设置电平标准&#xff1a; 解析串口数据、测量串…

[论文阅读]Universal and transferable adversarial attacks on aligned language models

Universal and transferable adversarial attacks on aligned language models http://arxiv.org/abs/2307.15043 图 1&#xff1a;Aligned LLMs 不是对抗性 Aligned。我们的攻击构建了一个单一的对抗性提示&#xff0c;该提示始终绕过最先进的商业模式&#xff08;包括 ChatG…

【C++】小乐乐求和题目分析n变量类型讨论

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;题目描述&#x1f4af;解题分析&#x1f4af;为什么 n 需要是 long long问题重点&#xff1a;中间计算水平上的数据类型不足的例子&#xff1a;正确解决&#xff1a;将 n 设…

计算机基础知识——数据结构与算法(一)(山东省大数据职称考试)

大数据分析应用-初级 第一部分 基础知识 一、大数据法律法规、政策文件、相关标准 二、计算机基础知识 三、信息化基础知识 四、密码学 五、大数据安全 六、数据库系统 七、数据仓库. 第二部分 专业知识 一、大数据技术与应用 二、大数据分析模型 三、数据科学 数据结构与算法…

WebView通过@JavascriptInterface 调用原生方法

1. 创建 WebView 和设置 WebView 设置 在 XML 布局中添加 WebView 在activity_main.xml里创建一个WebView控件 <?xml version"1.0" encoding"utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xmlns:android"http://schem…

基于AI对话生成剧情AVG游戏

游戏开发这个领域&#xff0c;一直有较高的学习门槛。作为一个非专业的游戏爱好者&#xff0c;如果想要开发游戏&#xff0c;往往受制于游戏引擎的专业程度&#xff0c;难以完成复杂的游戏项目。 AI IDE的诞生&#xff0c;提供了另外的一种思路&#xff0c;即通过AI 生成项目及…

ElasticSearch 数据聚合与运算

1、数据聚合 聚合&#xff08;aggregations&#xff09;可以让我们极其方便的实现数据的统计、分析和运算。实现这些统计功能的比数据库的 SQL 要方便的多&#xff0c;而且查询速度非常快&#xff0c;可以实现近实时搜索效果。 注意&#xff1a; 参加聚合的字段必须是 keywor…

F5中获取客户端ip地址(client ip)

当F5设备对其原始设置上的所有IP地址使用NAT时&#xff0c;连接到poo成员&#xff08;nodes、backend servers&#xff09;的出站连接将是NAT IP地址。 pool 成员&#xff08;nodes、backend servers&#xff09;将无法看到真实的客户端 ip地址&#xff0c;因为看到的是F5上的…

MATLAB引用矩阵元素的几种方法

引用矩阵元素可以通过索引&#xff0c;也可以通过逻辑值 索引 通过引用元素在矩阵中的位置来提取元素&#xff0c;例如&#xff1a; - 逻辑值 通过某种逻辑运算来使得要提取的值变为逻辑 1 1 1&#xff0c;用 A ( ) A() A()提取即可&#xff0c; A A A为原矩阵的名称。 例如&…

机器学习预处理-表格数据的空值处理

机器学习预处理-表格数据的空值处理 机器学习预处理-表格数据的分析与可视化中详细介绍了表格数据的python可视化&#xff0c;可视化能够帮助我们了解数据的构成和分布&#xff0c;是我们进行机器学习的必备步骤。上文中也提及&#xff0c;原始的数据存在部分的缺失&#xff0…

了解 SpringMVC 请求流程

文章目录 1. Spring 基础 - SpringMVC 请求流程1.1 引入1.2 什么是 MVC1.3 什么是 Spring MVC1.4 请求流程核心架构的具体流程步骤补充 1.5 案例**Maven 包引入****业务代码的编写**DaoServiceControllerwebapp 下的 web.xmlspringmvc.xmlJSP 视图 2. Spring 进阶 - Dispatcher…

Springboot3.x配置类(Configuration)和单元测试

配置类在Spring Boot框架中扮演着关键角色&#xff0c;它使开发者能够利用Java代码定义Bean、设定属性及调整其他Spring相关设置&#xff0c;取代了早期版本中依赖的XML配置文件。 集中化管理&#xff1a;借助Configuration注解&#xff0c;Spring Boot让用户能在一个或几个配…

鸿道Intewell-C纯实时构型,适合有功能安全认证需求的工业操作系统

鸿道Intewell-C纯实时构型&#xff0c;适合有功能安全认证需求的工业操作系统 鸿道Intewell-C是一款工业实时微内核操作系统&#xff0c;由科东软件自主研发&#xff0c;具有超低延迟和最小抖动&#xff0c;保障工业设备可以高效处理时间敏感的现场业务&#xff0c;支持多种工…