SQL Server CDC配合Kafka Connect监听数据变化

写在前面

        好久没更新Blog了,从CRUD Boy转型大数据开发,拉宽了不少的知识面,从今年年初开始筹备、组建、招兵买马,到现在稳定开搞中,期间踏过无数的火坑,也许除了这篇还很写上三四篇。

        进入主题,通常企业为了实现数据统计、数据分析、数据挖掘、解决信息孤岛等全局数据的系统化运作管理 ,为BI、经营分析、决策支持系统等深度开发应用奠定基础,挖掘数据价值 ,企业会开始着手建立数据仓库,数据中台。而这些数据来源则来自于企业的各个业务系统的数据或爬取外部的数据,从业务系统数据到数据仓库的过程就是一个ETL(Extract-Transform-Load)行为,包括了采集、清洗、数据转换等主要过程,通常异构数据抽取转换使用Sqoop、DataX等,日志采集Flume、Logstash、Filebeat等。

        数据抽取分为全量抽取和增量抽取,全量抽取类似于数据迁移或数据复制,全量抽取很好理解;增量抽取在全量的基础上做增量,只监听、捕捉动态变化的数据。如何捕捉数据的变化是增量抽取的关键,一是准确性,必须保证准确的捕捉到数据的动态变化,二是性能,不能对业务系统造成太大的压力。

增量抽取方式

  通常增量抽取有几种方式,各有优缺点。

1. 触发器

   在源数据库上的目标表创建触发器,监听增、删、改操作,捕捉到数据的变更写入临时表。

优点:操作简单、规则清晰,对源表不影响;

缺点:对源数据库有侵入,对业务系统有一定的影响;

2. 全表比对

  在ETL过程中,抽取方建立临时表待全量抽取存储,然后在进行比对数据。

优点:对源数据库、源表都无需改动,完全交付ETL过程处理,统一管理;

缺点:ETL效率低、设计复杂,数据量越大,速度越慢,时效性不确定;

3. 全表删除后再插入

  在抽取数据之前,先将表中数据清空,然后全量抽取。

优点:ETL 操作简单,速度快。

缺点:全量抽取一般采取T+1的形式,抽取数据量大的表容易对数据库造成压力;

4. 时间戳

  时间戳的方式即在源表上增加时间戳列,对发生变更的表进行更新,然后根据时间戳进行提取。

优点:操作简单,ELT逻辑清晰,性能比较好;

缺点:对业务系统有侵入,数据库表也需要额外增加字段。对于老的业务系统可能不容易做变更。

5. CDC方式

  变更数据捕获Change Data Capture(简称CDC),SQLServer为实时更新数据同步提供了CDC机制,类似于Mysql的binlog,将数据更新操作维护到一张CDC表中。开启CDC的源表在插入INSERT、更新UPDATE和删除DELETE活动时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中,通过cdc提供的查询函数,可以捕获这部分数据。详情可以查看官方介绍:关于变更数据捕获 (SQL Server)

优点:提供易于使用的API 来设置CDC 环境,缩短ETL 的时间,无需修改业务系统表结构。

缺点:受数据库版本的限制,实现过程相对复杂。

CDC增量抽取

先决条件

1. 已搭建好Kafka集群,Zookeeper集群;

2. 源数据库支持CDC,版本采用开发版或企业版。

案例环境:

Ubuntu 20.04

Kafka 2.13-2.7.0

Zookeeper  3.6.2

SQL Server 2012

步骤

   除了数据库开启CDC支持以外,主要还是要将变更的数据通过Kafka Connect传输数据,Debezium是目前官方推荐的连接器,它支持绝大多数主流数据库:MySQL、PostgreSQL、SQL Server、Oracle等等,详情查看Connectors。

1. 数据库步骤

开启数据库CDC支持

  在源数据库执行以下命令:

EXEC sys.sp_cdc_enable_db GO

附上关闭语句:

exec sys.sp_cdc_disable_db

查询是否启用

select * from sys.databases where is_cdc_enabled = 1

创建测试数据表:(已有表则跳过此步骤)

create  table T_LioCDC
(ID int identity(1,1) primary key ,Name nvarchar(16),Sex bit,CreateTime datetime,UpdateTime datetime
);

对源表开启CDC支持:

exec sp_cdc_enable_table 
@source_schema='dbo', 
@source_name='T_LioCDC', 
@role_name=null,
@supports_net_changes = 1;

确认是否有权限访问CDC Table:

EXEC sys.sp_cdc_help_change_data_capture

 确认SQL Server Agent已开启:

EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'

        以上则完成对数据库的CDC操作。

2. Kafka步骤

        Kafka Connect的工作模式分为两种,分别是standalone模式和distributed模式。standalone用于单机测试,本文用distributed模式,用于生产环境。(Kafka必须先运行启动,再进行以下步骤进行配置。)

下载Sql Server Connector

        下载连接器后,创建一个文件夹来存放,解压到该目录下即可,例子路径: /usr/soft/kafka/kafka_2.13_2.7.0/plugins (记住这个路径,配置中要用到)。

 下载地址:debezium-connector-sqlserver-1.5.0.Final-plugin.tar.gz

编辑connect-distributed.properties配置

  修改Kafka connect配置文件,$KAFKA_HOME/config/connect-distributed.properties,变更内容如下:

//kafka集群ip+port
bootstrap.servers=172.192.10.210:9092,172.192.10.211:9092,172.192.10.212:9092key.converter.schemas.enable=false
value.converter.schemas.enable=falseoffset.storage.topic=connect-offsets
offset.storage.replication.factor=1
offset.storage.partitions=3
offset.storage.cleanup.policy=compactconfig.storage.topic=connect-configs
config.storage.replication.factor=1status.storage.topic=connect-status
status.storage.replication.factor=1
status.storage.partitions=3
//刚刚下载连接器解压的路径
plugin.path=/usr/soft/kafka/kafka_2.13_2.7.0/plugins

看到配置中有三个Topic,分别是

config.storage.topic:用以保存connector和task的配置信息,需要注意的是这个主题的分区数只能是1,而且是有多副本的。
offset.storage.topic:用以保存offset信息。
status.storage.topic:用以保存connetor的状态信息。

    这些Topic可以不用创建,启动后会默认创建。


启动Kafka集群

  保存配置之后,将connect-distributed.properties分发到集群中,然后启动:

bin/connect-distributed.sh config/connect-distributed.properties


检查是否启动

  connector支持REST API的方式进行管理,所以用Post man或者Fiddler可以调用相关接口进行管理。检查是否启动:

    不用奇怪,上面配置集群的IP是172段,这里的192.168.1.177仍是我的集群中的一个服务器,因为服务器都使用了双网卡。因为还没有连接器相关配置,所以接口返回是一个空数组,接下来将新增一个连接器。


编写sqlserver-cdc-source.json

{"name": "sqlserver-cdc-source","config": {"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector","database.server.name" : "JnServer","database.hostname" : "172.192.20.2", --目标数据库的ip"database.port" : "1433",  --目标数据库的端口"database.user" : "sa",   --目标数据库的账号"database.password" : "123456",  --密码"database.dbname" : "Dis",  --目标数据库的数据库名称"table.whitelist": "dbo.T_LioCDC", --监听表名"schemas.enable" : "false",  "mode":"incrementing",  --增量模式"incrementing.column.name": "ID", --增量列名"database.history.kafka.bootstrap.servers" : "172.192.10.210:9092,172.192.10.211:9092,172.192.10.212", --kafka集群"database.history.kafka.topic": "TopicTLioCDC",  --kafka topic内部使用,不是由消费者使用"value.converter.schemas.enable":"false","value.converter":"org.apache.kafka.connect.json.JsonConverter"}
}
//源文地址:https://www.cnblogs.com/EminemJK/p/14688907.html

还有其他额外的配置,可以参考官方文档。然后执行:

继续执行检查,就发现连接器已经成功配置了:

其他API

GET /connectors – 返回所有正在运行的connector名。
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息。
GET /connectors/{name}/config – 获取指定connector的配置信息。
PUT /connectors/{name}/config – 更新指定connector的配置信息。
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。//源文地址: https://www.cnblogs.com/EminemJK/p/14688907.html

查看Topic

/usr/soft/kafka/kafka_2.13_2.7.0# bin/kafka-topics.sh --list --zookeeper localhost:2000

Topic JnServer.dbo.T_LioCDC 则是供我们消费的主题,启动一个消费者进行监听测试:

bin/kafka-console-consumer.sh --bootstrap-server 172.192.10.210:9092  --consumer-property group.id=group1 --consumer-property client.id=consumer-1  --topic JnServer.dbo.T_LioCDC

然后再源表进行一些列增删改操作,

--测试代码
insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('A',1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('B',0,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('C',1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('D',0,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('E',1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('F',1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('G',0,getdate(),getdate())update T_LioCDC
set Name='Lio.Huang',UpdateTime=getdate()
where ID=7

已经成功捕捉到数据的变更,对比几个操作Json,依次是insert、update、delete:

 最后

        下班!

(注文中的指定链接可在"阅读原文"中获取)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/303008.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

惊喜开学季,教你如何在人工智能时代站稳脚跟!

暑假咻地一下过完啦,前几天,小天介绍了关于数模课程的开学季限时优惠(传送门),今天要介绍的是python课程。接下来,小天来详细说明一下!领取方式:公众号后台对话框回复“人工智能”免…

矢量合成和分解的法则_力的合成与分解专题解析,寒假复习!

合力与分力如果几个力共同作用在物体上产生的效果与一个力单独作用在物体上产生的效果相同,则把这个力叫做这几个力的合力,而那几个力叫做这一个力的分力。合力与分力的关系是等效替代关系,即一个力若分解为两个分力,在分析和计算…

.NET WebSocket 核心原理初体验

上个月我写了《.NET gRPC核心功能初体验》, 里面使用gRPC双向流做了一个打乒乓球的Demo, [实时][双向]这两个标签是不是很熟悉,对, WebSockets也可以做实时双向通信。本文将利用WebSockets(SignalR的一部分)搭建一个可双向通信的A…

CvBlobDetector 新目标检测算法简析

CvBlobDetector用于检测和判定当前帧中的Blob是否是新产生的目标,方法如下:

原来这些流弊的老板,曾经还是牛逼的程序猿!

“生活就像巧克力,你永远不知道下一颗是什么味道”,这句话用在互联网最适合不过,互联网从人类里面创造了一批神,说起他们的名字无人不知,但是你可知道这些神曾经也只是一个名不见经传的程序员。一起来看看他们是怎么一…

phpstormp新建PHP保存在哪里_记一次windows配置PHP环境

前言因为疫情原因一直不能回学校,电脑还在学校宿舍,所以在笔记本上搭建PHP环境,总不能在家混吃等死吧。正文【0】安装Apache2.40-0:Apache的下载链接The Apache HTTP Server Project0-1:点击a number of third party vendors下载window版本0-…

通过Dapr实现一个简单的基于.net的微服务电商系统(六)——一步一步教你如何撸Dapr之Actor服务...

我个人认为Actor应该是Dapr里比较重头的部分也是Dapr一直在讲的所谓“stateful applications”真正具体的一个实现(个人认为),上一章讲到有状态服务可能很多同学看到后的第一反应是“不就是个分布式缓存吗”。那今天就讲讲Actor,看看这个东西…

想转行人工智能?哈佛博士后有话说!

从17年开始,各大高校的数据科学与大数据技术专业持续火爆,2018年,北京大学、西安交通大学等高校更在本科阶段设立人工智能一级学科,中国顶尖人才的流向在悄然改变……据目前最新的数据显示,AI行业开发人员的月薪基本上…

转载标明出处用英语_英语原版阅读:At the beach

今天分享一篇阅读理解。可以学完一般现在时后进行同步阅读,也可以作为日常的阅读材料。每日10分钟英语阅读,养成习惯,孩子的英语学习不用愁。这篇文章的题目是At the beach图片来源于网络先来读文章:图片来源于网络1.In summer I …

12 月份 10 个新鲜的 jQuery 插件和教程

1. MASHA (Mark & Share) MASHA (Mark & Share 的缩写) 是一个可以让你分享网页部分内容的 JavaScript 库。 2. JScraft scroller 通过点击某个图片,该图将移到网页中央,其他相应的图片进行位置滑动。在线演示:here. 3. Windows-like…

大数据时代,如何才能提高自身竞争力?

暑假咻地一下过完啦,前几天,小天介绍了关于数模课程的开学季限时优惠(传送门),今天要介绍的是python课程。接下来,小天来详细说明一下!领取方式:公众号后台对话框回复“人工智能”免…

从容器到容器云,什么才是Kubernetes的本质?

这两年,Kubernetes 击败了 Swarm 和 Mesos,几乎成为容器编排的事实标准,BAT、滴滴、京东、头条等大厂,都争相把容器和 K8S 项目作为技术重心,试图“放长线钓大鱼”。就说阿里吧,目前基本所有业务都跑在云上…

Java 的日子屈指可数,这是真的吗?

斯坦福大学的计算机科学入门课采用JavaScript,摈弃Java,但是它的基数很大。年4月初,斯坦福大学开始试推行计算机科学入门课CS 106A的新版本。这个名为CS 106J的新版本用JavaScript来教,而不是用Java来教。斯坦福大学的官方网站解释…

5gh掌上云计算认证不通过_【众志成城战疫情】法官助理告诉你“移动微法院”、“掌上法庭”有多便捷、有多硬核~!...

今天中午,小编收到了一篇来自普定法院白岩法庭法官助理的投稿,她一方面告知小编白岩法庭今天上午通过微法院“掌上法庭”成功审理了一起买卖合同纠纷案纷,一方面强烈给小编安利了这个小程序,并且要求在本文结尾附上微法院的操作手…

“杀”一个程序员不需要用枪,改三次需求就可以了!

在很多软件公司,特别是一些创业型的团队中,对于这样的情景可能大家都很熟悉:项目经理或者产品经理(产品狗)口头或者简单记录一下软件产品的大致要做的功能,直接就让研发团队的兄弟(程序猿&#…

WPF 仿QQ登录框翻转效果

突然发现qq登录窗体的翻转特效看起来不错&#xff0c;决定用wpf试试。主要知识点就是Viewport3D和AxisAngleRotation3D看一下效果&#xff1a;下面看一下代码&#xff1a;主要xaml代码&#xff1a;<UserControl x:Class"GQ.DirectContentSample"xmlns"http:/…

机器人J中WPR_优傲:协作机器人的未来在哪里?

2019年9月17日&#xff0c;第21届中国国际工业博览会于上海国家会展中心如期举办&#xff0c;期间优傲机器人(Universal Robots)公司推出新品&#xff0c;UR16e。有效载荷高达16kg&#xff0c;引起业界广泛的关注。优傲机器人总裁Jrgen von Hollen表示&#xff1a;“在当今不明…

你试过不用if撸代码吗?

试着不用if撸代码&#xff0c;是件很有趣的事&#xff0c;而且&#xff0c;万一你领会了什么是“数据即代码&#xff0c;代码即数据”呢&#xff1f;我在教新手编程时&#xff0c;喜欢给他们一些小小的挑战&#xff0c;比如&#xff1a;不使用if语句(或者三元运算符、switch语句…

NET问答: 对 Linq 中的 Union 和 Concat 的用法困惑

咨询区 Prasad Kanaparthi&#xff1a;我在使用 Union 和 Concat 上有一个困惑&#xff0c;从字面上理解&#xff1a;一个是并集&#xff0c;一个是连接&#xff0c;下面的例子就是我对这两个扩展方法的理解。static void Main(string[] args){var a1 (new[] { 1, 2 }).Union(…

中止是怎么用的_多士炉怎么用 使用多士炉注意事项

阅读本文前&#xff0c;请您先点击上面的蓝色字体&#xff0c;再点击“关注”&#xff0c;这样您就可以继续免费收到最新文章了。每天都有分享。完全是免费订阅&#xff0c;请放心关注。 …