TiCDC Canal-JSON 消息接收示例(Java 版)

1.引言

业务程序经常会通过各式各样的缓存来提升用户的访问速度。

由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。

如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。

此时,可以通过其他业务部门暴露数据变更通知来感知到数据变化,从而保证数据的更新及时性。

TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 可以将数据解析为有序的行级变更数据输出到下游。

因此,可以通过 TiCDC 将数据变更通知暴露给业务程序,来让业务程序做及时的对应处理逻辑。

本文以一张用户表的数据变更为例,来展示 Java 服务端接收一条 TiCDC Canal-JSON 的消息变更,解析数据,并转发给对应的业务处理程序的流程。

之前写类似的程序时,网上搜索到的案例还是比较少的,本文仅抛砖引玉,欢迎各位大佬批评指正!

2. 代码思路

(1)通过 kafka 消息获取 CDC 消息

(2)解析 CDC 消息,判断其数据变更类型,执行对应的处理逻辑

3. 代码实现

3.1 代码结构

cdc-demo
└─ src└─ main└─ java└─ com.example.demo├─ constants│    └─ CdcConstants.java├─ dto│    ├─ CdcMessage.java│    └─ User.java├─ job│    ├─ CdcJob.java│    └─ UserCdcJob.java└─ service├─ impl│   └─ UserServiceImpl.java└─ CdcService.java

3.2 CDC 常量类

public class CdcConstants {
​public enum MessageType {/*** 插入操作*/INSERT,
​/*** 更新操作*/UPDATE,
​/*** 删除操作*/DELETE;}
}

3.3 实体类

3.3.1 用户实体类

@Getter
@Setter
public class User {
​/*** 用户id*/private Long id;
​/*** 用户名*/private String name;
​/*** 年龄*/private Integer age;
}

3.3.2 CDC 消息实体类

@Getter
@Setter
public class CdcMessage<T> {
​/*** 数据集合*/private List<T> data;
​/*** 数据库名称*/private String database;
​/*** 是否为DDL语句isDdl*/private boolean isDdl;
​/*** 表结构的类型字段(值为字段类型,如varchar)*/private T mysqlType;
​/*** UPDATE类型下的旧数据(未变更字段无数据)*/private List<T> oldData;
​/*** sql语句*/private String sql;
​/*** 值为int类型*/private T sqlType;
​/*** 数据表名*/private String table;
​/*** 新增(INSERT)、更新(UPDATE)、删除(DELETE)、删除表(ERASE)等*/private String type;
}

3.4 任务类

3.4.1 CDC 任务基类

@Slf4j
public class CdcJob<T> {
​protected CdcService<T> cdcService;
​/*** 处理消息** @param record 消息记录* @param ack    消息处理标识*/public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {String recordString = String.format("topic:%s,partition:%s,offset:%s,value:%s",record.topic(),record.partition(),record.offset(),record.value());log.info("数据更新开始处理,消息:" + recordString);try {boolean processResult = process(record);String processResultString = processResult ? "成功" : "无更新";log.info("数据更新处理结束,处理结果:" + processResultString);} catch (Exception e) {log.error("数据更新报错,", e);} finally {// 手动提交偏移量ack.acknowledge();}}
​/*** 处理数据** @param record kafka消费记录* @return 处理结果*/public boolean process(ConsumerRecord<String, String> record) {String bizName = this.getClass().getSimpleName();// 服务为初始化报错if (null == cdcService) {throw new IllegalStateException("服务未初始化");}
​// 解析消息CdcMessage<T> cdcMessage = JSON.parseObject(record.value(), new TypeReference<CdcMessage<T>>() {});
​// 跳过DDLif (cdcMessage.isDdl()) {log.info(bizName, "DDL变更,无需处理");return false;}// 处理结果初始化boolean result = false;// 服务层处理数据List<T> dataList = cdcMessage.getData();if (CdcConstants.MessageType.INSERT.name().equals(cdcMessage.getType())) {result = cdcService.insert(dataList);} else if (CdcConstants.MessageType.UPDATE.name().equals(cdcMessage.getType())) {result = cdcService.update(cdcMessage.getOldData(), dataList);} else if (CdcConstants.MessageType.DELETE.name().equals(cdcMessage.getType())) {result = cdcService.delete(dataList);} else {log.warn(bizName, "不处理该消息,消息类型:" + cdcMessage.getType());}return result;}
}

3.4.2 用户表 CDC 消费任务类

@Component
public class UserCdcJob extends CdcJob<User> {
​public UserCdcJob(UserServiceImpl userService) {this.cdcService = userService;}
​/*** 消费CDC消息,并进行处理** @param record 消息记录* @param ack    消息处理标识*/@KafkaListener(id = "UserCdcJob", groupId = "${user-cdc.group}",topics = {"${user-cdc.topic}"}, containerFactory = "cdcKafkaListenerFactory")public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {handleMessage(record, ack);}
}

3.5 服务类

3.5.1 CDC 消息处理服务接口

public interface CdcService<T> {
​/*** 插入数据** @param data 数据* @return 插入结果*/boolean insert(List<T> data);
​/*** 更新数据** @param oldData 更新前数据* @param newData 更新后数据* @return 更新结果*/boolean update(List<T> oldData, List<T> newData);
​/*** 删除数据** @param data 数据* @return 删除数据*/boolean delete(List<T> data);
}

3.5.2 用户服务实现类

@Service
public class UserServiceImpl implements CdcService<User> {
​@Overridepublic boolean insert(List<User> data) {// TODOreturn false;}
​@Overridepublic boolean update(List<User> oldData, List<User> newData) {// TODOreturn false;}
​@Overridepublic boolean delete(List<User> data) {// TODOreturn false;}
}

4.参考文档

TiCDC 简介:https://docs.pingcap.com/zh/tidb/stable/ticdc-overview

TiCDC Canal-JSON 协议:https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/54928.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小区物业业主管理信息系统设计的设计与实现(论文+源码)_kaic

摘 要 随着互联网的发展&#xff0c;网络技术的发展变得极其重要&#xff0c;所以依靠计算机处理业务成为了一种社会普遍的现状。管理方式也自然而然的向着现代化技术方向而改变&#xff0c;所以纯人工管理方式在越来越完善的现代化管理技术的比较之下也就显得过于繁琐&#x…

Linux vmstat命令

vmstat命令是一个可用于报告系统虚拟内存、进程、CPU活动和输入输出统计信息的工具。以下是vmstat命令的详细解释说明&#xff1a; 用法&#xff1a; vmstat [选项] [时间间隔] [重复次数]常用选项&#xff1a; -a&#xff1a;显示所有模式&#xff08;虚拟内存、进程和IO&am…

ReoGrid.NET集成到winfrom

ReoGrid一个支持excel操作的控件,支持集成到任何winfrom项目内。 先看效果图: 如何使用&#xff1a; 使用ReoGrid自带excel模版设计工具先设计一个模版,设计器如下&#xff1a; 具体例子看官方文档 代码示例如下&#xff1a; var sheet reoGridControl1.CurrentWorksheet; …

Unity光照相关

1. 光源类型 Unity支持多种类型的光源&#xff0c;包括&#xff1a; 1. 点光源&#xff08;Point Light&#xff09;&#xff1a;从一个点向四周发射光线&#xff0c;适用于需要突出物体的光源。 2. 平行光&#xff08;Directional Light&#xff09;&#xff1a;从无限远处…

live555server环境搭建

live555环境搭建详解&#xff08;ubuntu18.04&#xff09; 1.环境依赖 openssl可选安不安 安装&#xff08;选择好版本&#xff09; sudo apt-get update sudo apt-get install openssl sudo apt-get install libssl-dev使用头文件是否可用时编译测试时记得链接&#xff08…

JavaScript快速入门

JavaScript简介 JavaScript&#xff08;简称&#xff1a;js&#xff09;是一门跨平台&#xff0c;面向对象的脚本语言&#xff0c;是用来控制网页行为的&#xff0c;它能使网页可交互。 JavaScript和java是完全不同的语言&#xff0c;不论是概念还是设计&#xff0c;但是基础语…

大数据时代的软件开发实践:利用云计算和AI赋能创新

文章目录 云计算的赋能弹性资源管理远程协作与分布式开发持续集成和持续交付成本效益 人工智能的赋能数据驱动的决策自动化智能预测和优化自适应系统 创新的实践方法数据驱动的创新智能化产品开放式创新迭代和反馈 &#x1f388;个人主页&#xff1a;程序员 小侯 &#x1f390;…

TypeError: ‘set‘ object is not subscriptable

问题出现的背景&#xff1a;写了一个python脚本&#xff0c;在脚本里用到了 pyexcel_xlsx 这个包&#xff0c;这个包可以读取excel文件。在本地运行可以运行成功&#xff0c;在Linux服务器上面运行报这个错。两边python都是用到3.8版本的&#xff0c;pyexcel_xlsx 版本也相同…

软考:中级软件设计师:OSI/RM七层模型,网络技术标准与协议

软考&#xff1a;中级软件设计师:OSI/RM七层模型 提示&#xff1a;系列被面试官问的问题&#xff0c;我自己当时不会&#xff0c;所以下来自己复盘一下&#xff0c;认真学习和总结&#xff0c;以应对未来更多的可能性 关于互联网大厂的笔试面试&#xff0c;都是需要细心准备的…

Vue2学习笔记のvue-router

这里写目录标题 vue-router路由1.基本使用2.几个注意点3.多级路由&#xff08;多级路由&#xff09;4.路由的query参数5.命名路由6.路由的params参数7.路由的props配置8.<router-link>的replace属性9.编程式路由导航10.缓存路由组件11.两个新的生命周期钩子12.路由守卫13…

java: 无法访问org.springframework.boot.SpringApplication 错误的类文件

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; 错误1&#xff1a; java: 无法访问org.springframework.boot.SpringApplication 错误的类文件: /D:/Software/env-java/apache-maven-3.6.1/repository/org/springframework/boot/spring-boot/3.1.2/sp…

MySQL—MySQL的NULL值是怎么存放的

一、引言 1、MySQL数据存放在哪个文件&#xff1f; 创建一个数据库会产生三种格式的文件&#xff0c;分别是.opt格式、.frm格式、.ibd格式。 opt格式&#xff1a;用来存储当前数据库的默认字符集和字符校验规则。 frm格式&#xff1a;该文件是用来保存每个表的元数据信息的&…

【uniapp】微信小程序 , 海报轮播图弹窗,点击海报保存到本地,长按海报图片分享,收藏或保存

uivew 2.0 uniapp 海报画板 DCloud 插件市场 第一步&#xff0c;下载插件并导入HbuilderX 第二步&#xff0c;文件内 引入 海报组件 <template><painter ref"haibaorefs"></painter> <template> <script>import painter from /comp…

Node.js-Express框架基本使用

Express介绍 Express是基于 node.js 的web应用开发框架&#xff0c;是一个封装好的工具包&#xff0c;便于开发web应用&#xff08;HTTP服务&#xff09; Express基本使用 // 1.安装 npm i express // 2.导入 express 模块 const express require("express"); // 3…

线程的生命周期

线程的生命周期 与人有生老病死一样&#xff0c;线程也同样要经历开始&#xff08;等待&#xff09;、运行、挂起和停止四种不同的状态。这四种状态都可以通过Thread类中的方法进行控制。下面给出了Thread类中和这四种状态相关的方法。 // 开始线程 public void start( ); …

npm和yarn的区别?

文章目录 前言npm和yarn的作用和特点npm和yarn的安装的机制npm安装机制yarn安装机制检测包解析包获取包链接包构建包 总结后言 前言 这一期给大家讲解npm和yarn的一些区别 npm和yarn的作用和特点 包管理&#xff1a;npm 和 yarn 可以用于安装、更新和删除 JavaScript 包。它们提…

软件开发企业SDL安全培训案例

1.背景 随着计算机技术的发展、internet及mobile应用的普遍使用,软件安全像功能、性能、稳定性一样是计算机系统的一个非常重要部分。没有安全的软件,任何美好的功能都是徒劳的,没有安全的软件,公司的机密数据、客户隐私、系统的可靠性都得不到保障.如何有效评估、开发安全、可…

elemenPlus ElMessage 字符串如何换行问题

因为后端返回的数据是一长串&#xff0c;而且带有\r,\n等换行符&#xff0c;但是并没有生效。前端写法&#xff1a; // 抛出错误ElMessage.error(msg);我们知道\r&#xff0c;\n&#xff0c;\r\n 是在不同系统下的换行符的表示&#xff0c;但在JavaScript返回字符串中并没有生效…

TiDB 源码编译之 TiProxy 篇

作者&#xff1a; ShawnYan 原文来源&#xff1a; https://tidb.net/blog/3d57f54d TiProxy 简介 TiProxy 是一个基于 Apache 2.0 协议开源的、轻量级的 TiDB 数据库代理&#xff0c;基于 Go 语言编写&#xff0c;支持 MySQL 协议。 TiProxy 支持负载均衡&#xff0c;接收来…

复习leetcode

​​​​​​460. LFU 缓存 31. 下一个排列 322. 零钱兑换 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台