1.引言
业务程序经常会通过各式各样的缓存来提升用户的访问速度。
由于存在缓存,在一些实时性要求较高的场景中,需要在数据变更的同时将数据缓存进行更新或删除。
如果数据本身由其他业务部门提供,就无法在写入的同时做缓存的一致性处理。
此时,可以通过其他业务部门暴露数据变更通知来感知到数据变化,从而保证数据的更新及时性。
TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 可以将数据解析为有序的行级变更数据输出到下游。
因此,可以通过 TiCDC 将数据变更通知暴露给业务程序,来让业务程序做及时的对应处理逻辑。
本文以一张用户表的数据变更为例,来展示 Java 服务端接收一条 TiCDC Canal-JSON 的消息变更,解析数据,并转发给对应的业务处理程序的流程。
之前写类似的程序时,网上搜索到的案例还是比较少的,本文仅抛砖引玉,欢迎各位大佬批评指正!
2. 代码思路
(1)通过 kafka 消息获取 CDC 消息
(2)解析 CDC 消息,判断其数据变更类型,执行对应的处理逻辑
3. 代码实现
3.1 代码结构
cdc-demo
└─ src└─ main└─ java└─ com.example.demo├─ constants│ └─ CdcConstants.java├─ dto│ ├─ CdcMessage.java│ └─ User.java├─ job│ ├─ CdcJob.java│ └─ UserCdcJob.java└─ service├─ impl│ └─ UserServiceImpl.java└─ CdcService.java
3.2 CDC 常量类
public class CdcConstants {
public enum MessageType {/*** 插入操作*/INSERT,
/*** 更新操作*/UPDATE,
/*** 删除操作*/DELETE;}
}
3.3 实体类
3.3.1 用户实体类
@Getter
@Setter
public class User {
/*** 用户id*/private Long id;
/*** 用户名*/private String name;
/*** 年龄*/private Integer age;
}
3.3.2 CDC 消息实体类
@Getter
@Setter
public class CdcMessage<T> {
/*** 数据集合*/private List<T> data;
/*** 数据库名称*/private String database;
/*** 是否为DDL语句isDdl*/private boolean isDdl;
/*** 表结构的类型字段(值为字段类型,如varchar)*/private T mysqlType;
/*** UPDATE类型下的旧数据(未变更字段无数据)*/private List<T> oldData;
/*** sql语句*/private String sql;
/*** 值为int类型*/private T sqlType;
/*** 数据表名*/private String table;
/*** 新增(INSERT)、更新(UPDATE)、删除(DELETE)、删除表(ERASE)等*/private String type;
}
3.4 任务类
3.4.1 CDC 任务基类
@Slf4j
public class CdcJob<T> {
protected CdcService<T> cdcService;
/*** 处理消息** @param record 消息记录* @param ack 消息处理标识*/public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {String recordString = String.format("topic:%s,partition:%s,offset:%s,value:%s",record.topic(),record.partition(),record.offset(),record.value());log.info("数据更新开始处理,消息:" + recordString);try {boolean processResult = process(record);String processResultString = processResult ? "成功" : "无更新";log.info("数据更新处理结束,处理结果:" + processResultString);} catch (Exception e) {log.error("数据更新报错,", e);} finally {// 手动提交偏移量ack.acknowledge();}}
/*** 处理数据** @param record kafka消费记录* @return 处理结果*/public boolean process(ConsumerRecord<String, String> record) {String bizName = this.getClass().getSimpleName();// 服务为初始化报错if (null == cdcService) {throw new IllegalStateException("服务未初始化");}
// 解析消息CdcMessage<T> cdcMessage = JSON.parseObject(record.value(), new TypeReference<CdcMessage<T>>() {});
// 跳过DDLif (cdcMessage.isDdl()) {log.info(bizName, "DDL变更,无需处理");return false;}// 处理结果初始化boolean result = false;// 服务层处理数据List<T> dataList = cdcMessage.getData();if (CdcConstants.MessageType.INSERT.name().equals(cdcMessage.getType())) {result = cdcService.insert(dataList);} else if (CdcConstants.MessageType.UPDATE.name().equals(cdcMessage.getType())) {result = cdcService.update(cdcMessage.getOldData(), dataList);} else if (CdcConstants.MessageType.DELETE.name().equals(cdcMessage.getType())) {result = cdcService.delete(dataList);} else {log.warn(bizName, "不处理该消息,消息类型:" + cdcMessage.getType());}return result;}
}
3.4.2 用户表 CDC 消费任务类
@Component
public class UserCdcJob extends CdcJob<User> {
public UserCdcJob(UserServiceImpl userService) {this.cdcService = userService;}
/*** 消费CDC消息,并进行处理** @param record 消息记录* @param ack 消息处理标识*/@KafkaListener(id = "UserCdcJob", groupId = "${user-cdc.group}",topics = {"${user-cdc.topic}"}, containerFactory = "cdcKafkaListenerFactory")public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) {handleMessage(record, ack);}
}
3.5 服务类
3.5.1 CDC 消息处理服务接口
public interface CdcService<T> {
/*** 插入数据** @param data 数据* @return 插入结果*/boolean insert(List<T> data);
/*** 更新数据** @param oldData 更新前数据* @param newData 更新后数据* @return 更新结果*/boolean update(List<T> oldData, List<T> newData);
/*** 删除数据** @param data 数据* @return 删除数据*/boolean delete(List<T> data);
}
3.5.2 用户服务实现类
@Service
public class UserServiceImpl implements CdcService<User> {
@Overridepublic boolean insert(List<User> data) {// TODOreturn false;}
@Overridepublic boolean update(List<User> oldData, List<User> newData) {// TODOreturn false;}
@Overridepublic boolean delete(List<User> data) {// TODOreturn false;}
}
4.参考文档
TiCDC 简介:https://docs.pingcap.com/zh/tidb/stable/ticdc-overview
TiCDC Canal-JSON 协议:https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json