TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

  • 一、技术流程
  • 二、搭建环境
  • 三、创建Kafka changefeed
  • 四、写入数据以产生变更日志
  • 五、配置 Flink 消费 Kafka 数据

一、技术流程

  • 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群
  • 创建 changefeed,将 TiDB 增量数据输出至 Kafka
  • 使用 go-tpc 写入数据到上游 TiDB
  • 使用 Kafka console consumer 观察数据被写入到指定的 Topic
  • (可选)配置 Flink 集群消费 Kafka 内数据

二、搭建环境

部署包含 TiCDC 的 TiDB 集群

在实验或测试环境中,可以使用 TiUP Playground 功能,快速部署 TiCDC,命令如下:

tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1
# 查看集群状态
tiup status

三、创建Kafka changefeed

1.创建 changefeed 配置文件

根据 Flink 的要求和规范,每张表的增量数据需要发送到独立的 Topic 中,并且每个事件需要按照主键值分发 Partition。因此,需要创建一个名为 changefeed.conf 的配置文件,填写如下内容:

[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"},
]

2.创建一个 changefeed,将增量数据输出到 Kafka

tiup ctl:v<CLUSTER_VERSION> cdc changefeed 
create --server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json" 
--changefeed-id="kafka-changefeed" 
--config="changefeed.conf"

如果命令执行成功,将会返回被创建的 changefeed 的相关信息,包含被创建的 changefeed 的 ID 以及相关信息,内容如下:

Create changefeed successfully!
ID: kafka-changefeed
Info: {... changfeed info json struct ...}

如果命令长时间没有返回,你需要检查当前执行命令所在服务器到 sink-uri 中指定的 Kafka 机器的网络可达性,保证二者之间的网络连接正常。

生产环境下 Kafka 集群通常有多个 broker 节点,你可以在 sink-uri 中配置多个 broker 的访问地址,这有助于提升 changefeed 到 Kafka 集群访问的稳定性,当部分被配置的 Kafka 节点故障的时候,changefeed 依旧可以正常工作。假设 Kafka 集群中有 3 个 broker 节点,地址分别为 127.0.0.1:9092 / 127.0.0.2:9092 / 127.0.0.3:9092,可以参考如下 sink-uri 创建 changefeed:

tiup ctl:v<CLUSTER_VERSION> cdc changefeed create 
--server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576" 
--config="changefeed.conf"

3.Changefeed 创建成功后,执行如下命令,查看 changefeed 的状态

tiup ctl:v<CLUSTER_VERSION> cdc changefeed list --server="http://127.0.0.1:8300"

四、写入数据以产生变更日志

完成以上步骤后,TiCDC 会将上游 TiDB 的增量数据变更日志发送到 Kafka,下面对 TiDB 写入数据,以产生增量数据变更日志。

1.模拟业务负载

在测试实验环境下,可以使用 go-tpc 向上游 TiDB 集群写入数据,以让 TiDB 产生事件变更数据。如下命令,首先在上游 TiDB 创建名为 tpcc 的数据库,然后使用 TiUP bench 写入数据到这个数据库中。

tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare
tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s

2.消费 Kafka Topic 中的数据

changefeed 正常运行时,会向 Kafka Topic 写入数据,你可以通过由 Kafka 提供的 kafka-console-consumer.sh,观测到数据成功被写入到 Kafka Topic 中:

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}`

至此,TiDB 的增量数据变更日志就实时地复制到了 Kafka。下一步,你可以使用 Flink 消费 Kafka 数据。当然,你也可以自行开发适用于业务场景的 Kafka 消费端。

五、配置 Flink 消费 Kafka 数据

1.安装 Flink Kafka Connector

在 Flink 生态中,Flink Kafka Connector 用于消费 Kafka 中的数据并输出到 Flink 中。Flink Kafka Connector 并不是内建的,因此在 Flink 安装完毕后,还需要将 Flink Kafka Connector 及其依赖项添加到 Flink 安装目录中。下载下列 jar 文件至 Flink 安装目录下的 lib 目录中,如果你已经运行了 Flink 集群,请重启集群以加载新的插件。

  • flink-connector-kafka-1.17.1.jar
  • flink-sql-connector-kafka-1.17.1.jar
  • kafka-clients-3.5.1.jar

2.创建一个表

可以在 Flink 的安装目录执行如下命令,启动 Flink SQL 交互式客户端:

[root@flink flink-1.15.0]# ./bin/sql-client.sh

随后,执行如下语句创建一个名为 tpcc_orders 的表:

CREATE TABLE tpcc_orders (o_id INTEGER,o_d_id INTEGER,o_w_id INTEGER,o_c_id INTEGER,o_entry_d STRING,o_carrier_id INTEGER,o_ol_cnt INTEGER,o_all_local INTEGER
) WITH (
'connector' = 'kafka',
'topic' = 'tidb_tpcc_orders',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest'
)

请将 topic 和 properties.bootstrap.servers 参数替换为环境中的实际值。

3.查询表内容

执行如下命令,查询 tpcc_orders 表中的数据:

SELECT * FROM tpcc_orders;

执行成功后,可以观察到有数据输出,如下图

在这里插入图片描述
至此,就完成了 TiDB 与 Flink 的数据集成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/40019.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【网络编程系列】网络编程实战

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…

jvm内存溢出排查(使用idea自带的内存泄漏分析工具)

文章目录 1.确保生成内存溢出文件2.使用idea自带的内存泄漏分析工具3.具体实验一下 1.确保生成内存溢出文件 想分析堆内存溢出&#xff0c;一定在运行jar包时就写上参数-XX:HeapDumpOnOutOfMemoryError&#xff0c;可以看我之前关于如何运行jar包的文章。若你没有写。可以写上…

Python学习笔记_基础篇(九)_面向对象编程

本篇内容: 1、反射2、面向对象编程3、面向对象三大特性4、类成员5、类成员修饰符6、类的特殊成员7、单例模式 反射 python中的反射功能是由以下四个内置函数提供&#xff1a;hasattr、getattr、setattr、delattr&#xff0c;改四个函数分别用于对对象内部执行&#xff1a;检…

解决 adb install 错误INSTALL_FAILED_UPDATE_INCOMPATIBLE

最近给游戏出包&#xff0c;平台要求 v1 签名吧&#xff0c;AS 打包后&#xff0c;adb 执行安装到手机&#xff0c;我用的设备是google pixel6 , android 系统 13&#xff0c; 提示如下&#xff1a; adb install -r v5_android_202308161046.apk Performing Streamed Install a…

单片机第一季:零基础13——AD和DA转换

1&#xff0c;AD转换基本概念 51 单片机系统内部运算时用的全部是数字量&#xff0c;即0 和1&#xff0c;因此对单片机系统而言&#xff0c;无法直接操作模拟量&#xff0c;必须将模拟量转换成数字量。所谓数字量&#xff0c;就是用一系列0 和1 组成的二进制代码表示某个信号大…

Linux -- 进阶 Autofs自动挂载服务 实验详解

服务端创建共享目录&#xff0c; 客户端实现自动挂载 第一步 &#xff1a; 客户端&#xff0c;服务端 均关闭安全软件 [rootserver ~]# setenforce 0 [rootserver ~]# systemctl stop firewalld [rootnode1 ~]# setenforce 0 [rootnode1 ~]# systemctl stop firewalld 第二…

MyBaits(单独使用,与整合无关)小白版

文章目录 概述比较配置写xml加载上面配置并执行加载配置的方法方式一 执行方法方式一方式二(MyBatis映射器) 写配置文件的映射文件设置对象的别名&#xff08;简写&#xff09;获取自动生成的主键 查询结果和java的映射规则基本类型映射&#xff1a;简单对象映射&#xff1a;嵌…

加盐加密算法

MD5加密加盐加密项目密码升级 MD5加密 MD5一系列公式进行复杂数学运算&#xff1b;特点&#xff1a;&#xff08;用途校验和、计算hash值方式、加密&#xff09; 1&#xff1a;定长&#xff1b;无论原始数据多长&#xff1b;算出的结果都是4或者8字节的版本。 2&#xff1a;冲…

Java多线程实战

Java多线程实战 java多线程&#xff08;超详细&#xff09; java自定义线程池总结 Java创建线程方式 方法1&#xff0c;继承Thread类 方法2&#xff0c;实现Runable接口 方法2-2&#xff0c;匿名内部类形式lambda表达式 方法3&#xff0c;实现Callable接口&#xff0c;允许…

【深入理解Linux内核锁】三、原子操作

我的圈子: 高级工程师聚集地 我是董哥,高级嵌入式软件开发工程师,从事嵌入式Linux驱动开发和系统开发,曾就职于世界500强企业! 创作理念:专注分享高质量嵌入式文章,让大家读有所得! 文章目录 1、原子操作思想2、整型变量原子操作2.1 API接口2.2 API实现2.2.1 原子变量结…

log4j:WARN No appenders could be found for logger问题

本文将idea场景下的使用。 IDEA中&#xff0c;将配置文件命名为log4j.properties&#xff08;该命名才会被自动加载&#xff09;&#xff0c; 并放到某个目录下&#xff08;通常放到resources目录&#xff09;&#xff0c;并在resources上右键&#xff0c;找到Mark Directory a…

微信程序 自定义遮罩层遮不住底部tabbar解决

一、先上效果 二 方法 1、自定义底部tabbar 实现&#xff1a; https://developers.weixin.qq.com/miniprogram/dev/framework/ability/custom-tabbar.html 官网去抄 简单写下&#xff1a;在代码根目录下添加入口文件 除了js 文件的list 需要调整 其他原封不动 代码&#xf…

【路由协议】使用按需路由协议和数据包注入的即时网络模拟传递率(PDR)、总消耗能量和节点消耗能量以及延迟研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

斯坦福「小镇」开源AI智能体;小米应用商店将要求AI应用符合资质标准

&#x1f989; AI新闻 &#x1f680; 斯坦福「小镇」开源AI智能体 摘要:斯坦福研究人员开源了一个类似《西部世界》的数字化「小镇」,里面有25个AI智能体可以生活、工作、社交。这项研究被视为AGI的重要开端,可能会改变游戏、企业应用领域。网友期待这项技术改善游戏NPC的交互…

PyMuPDF`库实现PDF旋转功能

本文介绍了一个简单的Python应用程序&#xff0c;用于将PDF文件转换为旋转90度的PDF文件。主要用于csdn网站中导出的博客pdf是横向的&#xff0c;看起来不是很方便&#xff0c;才想到用python编制一个将pdf从横向转为纵向的功能。 功能 该PDF转换工具具有以下功能&#xff1a…

信息安全:防火墙技术原理与应用.

信息安全&#xff1a;防火墙技术原理与应用. 防火墙是网络安全区域边界保护的重要技术。为了应对网络威胁&#xff0c;联网的机构或公司将自己的网络与公共的不可信任的网络进行隔离&#xff0c;其方法是根据网络的安全信任程度和需要保护的对象&#xff0c;人为地划分若干安全…

C#工程建立后修改工程文件名与命名空间

使用之前的项目做二次开发&#xff0c;项目快结束的时候&#xff0c;需要把主项目的名称修改成我们想要的。 之前从来没有这么干过&#xff0c;记录一下。 步骤如下&#xff1a; 1&#xff1a;打开vs2010项目解决方案&#xff0c;重命名&#xff0c;如下图所示&#xff1a; …

设计模式之原型模式Prototype的C++实现

1、原型模式提出 在软件功能设计中&#xff0c;经常面临着“某些结构复杂的对象”的创建工作&#xff0c;且创建的对象想拥有其他对象在某一刻的状态&#xff0c;则可以使用原型模型。原型模型是通过拷贝构造函数来创建对象&#xff0c;并且该对象拥有其他对象在某一刻的状态。…

Docker基础入门:镜像、容器导入导出与私有仓库搭建

Docker基础入门&#xff1a;镜像导入导出与私有仓库搭建 一、 Docker镜像、容器的导入和导出1.1、Docker镜像的导出1.2、Docker镜像的载入1.3、Docker容器的导出1.4、Docker容器的导入 二、 镜像和容器导出和导入的区别:三、commit操作_本地镜像发布到阿里云3.1、commit操作有关…

ppt中线材相交接的地方,如何绘画

ppt中线材相交接的地方&#xff1a; 在ppt中绘画线材相互交接的地方&#xff1a; 1.1绘图工具中的“弧形” 1.2小技巧 “弧形”工具点一下&#xff0c;在ppt中如下 1.3拖动活动点进行调整图形 1.4绘画圆弧 1.5调整“圆弧”的大小&#xff0c;鼠标放在“黄色点”位置&#xf…