文章目录
- 1.项目结构:
- 2.依赖:
- 3.application.properties
- 4.DebeziumConnectorConfig类
- 5.TableEnum类
- 6.TableHandler接口(表处理抽象)
- 7.DefaultTableHandler默认实现类
- 8.UserTableHandler处理类
- 9.TableHandlerFactory工厂
- 10.DebeziumListener 监听事件
- 11.测试
环境:JDK8,Debezium1.94,postgresql12
1.项目结构:
2.依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.7.18</version></parent><groupId>com.linging</groupId><artifactId>springboot-debezium-server</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><debezium.version>1.9.4.Final</debezium.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${debezium.version}</version><exclusions><exclusion><artifactId>slf4j-reload4j</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-postgres</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.43</version></dependency></dependencies></project>
3.application.properties
# Debezium Configuration
#连接器基本信息
#指定 Debezium 连接器的名称,唯一标识,用于在 Kafka Connect 中区分不同的连接器。
debezium.name=my-postgres-connector
#指定连接器的类名,这里是连接postgresql
debezium.connector.class=io.debezium.connector.postgresql.PostgresConnector#偏移量相关
#指定偏移量存储的实现类,这里使用的是文件存储,将偏移量存储在本地文件中。
debezium.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
#处理数据的偏移量存储路径
debezium.offset.storage.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\offsets_0.dat
#处理数据的偏移量提交时间间隔,单位毫秒,设置为 0 表示每次处理完一批记录后立即提交偏移量,这可以减少数据丢失的风险,但会增加系统开销
#以上一次提交时间开始计算
debezium.offset.flush.interval.ms=10000#数据库连接信息
debezium.database.hostname=192.168.159.103
debezium.database.port=15432
debezium.database.user=postgres
debezium.database.password=123456
#捕获变更的数据库名称
debezium.database.dbname=db_test
#指定逻辑服务器的唯一标识,用于区分不同的数据库实例
debezium.database.server.id=postgresql_0
#指定逻辑服务器的名称,用于在 Kafka 主题中区分不同的数据库实例
debezium.database.server.name=customer_postgres_db_server#数据库历史记录
#指定数据库模式记录的实现类,这里使用的是文件存储,将数据库历史记录存储在本地文件中
debezium.database.history=io.debezium.relational.history.FileDatabaseHistory
#指定数据库模式记录文件的路径
debezium.database.history.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\history_0.dat#表和字段过滤
#debezium.table.include.list=public.user
#debezium.column.include.list=public.user.id,public.user.name#其他配置
#指定是否自动创建 PostgreSQL 的逻辑复制槽,设置为 filtered 表示只捕获配置中指定的表和字段的变更
debezium.publication.autocreate.mode=filtered
#指定 PostgreSQL 的逻辑复制插件名称,pgoutput是 PostgreSQL 的逻辑复制插件名称,用于捕获变更
debezium.plugin.name=pgoutput
#指定逻辑复制槽的名称
debezium.slot.name=dbz_customerdb_listener
#不执行初始快照,直接捕获变更数据,取值:never、initial、when_needed
debezium.snapshot.mode=never
#批量提交条数
debezium.max.batch.size=100logging.level.root=INFO
logging.level.io.debezium.postgres.BinlogReader=INFO
logging.level.io.davidarhcanjo=DEBUG
logging.level.io.debezium=INFO
4.DebeziumConnectorConfig类
@Configuration
public class DebeziumConnectorConfig {@Beanpublic Properties customerConnector(Environment env) {Properties props = new Properties();props.setProperty("name", env.getProperty("debezium.name"));props.setProperty("connector.class", env.getProperty("debezium.connector.class"));props.setProperty("offset.storage", env.getProperty("debezium.offset.storage"));props.setProperty("offset.storage.file.filename", env.getProperty("debezium.offset.storage.file.filename"));props.setProperty("offset.flush.interval.ms", env.getProperty("debezium.offset.flush.interval.ms"));props.setProperty("database.hostname", env.getProperty("debezium.database.hostname"));props.setProperty("database.port", env.getProperty("debezium.database.port"));props.setProperty("database.user", env.getProperty("debezium.database.user"));props.setProperty("database.password", env.getProperty("debezium.database.password"));props.setProperty("database.dbname", env.getProperty("debezium.database.dbname"));props.setProperty("database.server.id", env.getProperty("debezium.database.server.id"));props.setProperty("database.server.name", env.getProperty("debezium.database.server.name"));props.setProperty("database.history", env.getProperty("debezium.database.history"));props.setProperty("database.history.file.filename", env.getProperty("debezium.database.history.file.filename"));props.setProperty("table.include.list", TableEnum.getTableNames()); //表名props.setProperty("column.include.list", TableEnum.getColumns()); // 表中得哪些字段props.setProperty("publication.autocreate.mode", env.getProperty("debezium.publication.autocreate.mode"));props.setProperty("plugin.name", env.getProperty("debezium.plugin.name"));props.setProperty("slot.name", env.getProperty("debezium.slot.name"));props.setProperty("snapshot.mode", env.getProperty("debezium.snapshot.mode"));props.setProperty("max.batch.size", env.getProperty("debezium.max.batch.size"));return props;}
}
5.TableEnum类
package com.linging.enums;import java.util.Arrays;
import java.util.stream.Collectors;/*** 监听的表及字段配置* @author Linging* @version 1.0.0* @since 1.0*/
public enum TableEnum {DEFAULT("default", "defaultTableHandler", null),USER("public.user", "userTableHandler", "public.user.id,public.user.name"),;// 表名称private final String tableName;// 表处理类的名称private final String handlerName;// 表的字段名称,多个用逗号隔开public final String columnName;TableEnum(String tableName, String handlerName, String columnName) {this.tableName = tableName;this.handlerName = handlerName;this.columnName = columnName;}public String getTableName() {return tableName;}public String getHandlerName() {return handlerName;}public String getColumnName() {return columnName;}public static String getTableNames(){return Arrays.stream(TableEnum.values()).map(TableEnum::getTableName).filter(name -> !"default".equals(name)).distinct().collect(Collectors.joining(","));}public static String getColumns(){return Arrays.stream(TableEnum.values()).filter(e -> !"default".equals(e.getTableName()) && e.getColumnName() != null).map(TableEnum::getColumnName).distinct().collect(Collectors.joining(","));}
}
6.TableHandler接口(表处理抽象)
public interface TableHandler {void handle(SourceRecord sourceRecord);void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer);
}
7.DefaultTableHandler默认实现类
/*** 默认处理类*/
@Component("defaultTableHandler")
public class DefaultTableHandler implements TableHandler {private static final Logger log = LoggerFactory.getLogger(DefaultTableHandler.class);@Overridepublic void handle(SourceRecord sourceRecord) {log.info("Handling default table: {}", sourceRecord.topic());log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());}@Overridepublic void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) {log.info("Handling batch default table: {}", recordChangeEvents.size());}
}
8.UserTableHandler处理类
/*** user表变更处理类*/
@Component("userTableHandler")
public class UserTableHandler implements TableHandler {private static final Logger log = LoggerFactory.getLogger(UserTableHandler.class);@Overridepublic void handle(SourceRecord sourceRecord) {log.info("Handling user table: {}", sourceRecord.topic());log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());// 添加具体的处理逻辑Struct sourceRecordChangeValue= (Struct) sourceRecord.value();if (sourceRecordChangeValue != null) {Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));// 处理非读操作if(operation != Envelope.Operation.READ) {String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;Struct struct = (Struct) sourceRecordChangeValue.get(record);Map<String, Object> payload = struct.schema().fields().stream().map(Field::name).filter(fieldName -> struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));// TODO 处理逻辑(保存数据库,发送MQ等操作,需要保证幂等)log.info("Updated Data: {} with Operation: {}", payload, operation.name());}}}@Overridepublic void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) {for (RecordChangeEvent<SourceRecord> recordChangeEvent : recordChangeEvents) {try {SourceRecord sourceRecord = recordChangeEvent.record();// TODO 处理逻辑(保存数据库,发送MQ等操作,需要保证幂等)this.handle(sourceRecord);// 标记已处理committer.markProcessed(recordChangeEvent);} catch (InterruptedException e) {log.error("处理异常:", e);}}}
}
9.TableHandlerFactory工厂
@Component
public class TableHandlerFactory implements ApplicationContextAware {@Value("${debezium.database.server.name}")private String prefixServerName;private ApplicationContext context;private final Map<String, TableHandler> handlers = new HashMap<>();@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context = applicationContext;}@PostConstructpublic void init(){String name = null;for (TableEnum tableEnum : TableEnum.values()) {if(TableEnum.DEFAULT.equals(tableEnum)){name = tableEnum.getTableName();}else{name = getTableName(tableEnum.getTableName());}handlers.putIfAbsent(name,context.getBean(tableEnum.getHandlerName(), TableHandler.class));}}public TableHandler getHandler(String tableName) {return handlers.getOrDefault(tableName, handlers.get(TableEnum.DEFAULT.getTableName()));}public String getTableName(String name){return prefixServerName + "." + name;}}
10.DebeziumListener 监听事件
@Component
public class DebeziumListener {private static final Logger log = LoggerFactory.getLogger(DebeziumListener.class);private final ExecutorService executor = Executors.newSingleThreadExecutor();private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;private final TableHandlerFactory tableHandlerFactory;@Autowiredpublic DebeziumListener(Properties customerConnector, TableHandlerFactory tableHandlerFactory) {this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(customerConnector).using(OffsetCommitPolicy.periodic(customerConnector)).notifying(this::handleChangeEventBatch).build();this.tableHandlerFactory = tableHandlerFactory;}/*** 批量记录处理* @param recordChangeEvents* @param committer*/private void handleChangeEventBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer){// 根据表分组Map<String, List<RecordChangeEvent<SourceRecord>>> tableName2List = recordChangeEvents.stream().collect(Collectors.groupingBy(event -> event.record().topic()));tableName2List.forEach((tableName, recordChangeEventList) -> {TableHandler handler = tableHandlerFactory.getHandler(tableName);handler.handleBatch(recordChangeEventList, committer);});try {// 触发提交策略committer.markBatchFinished();} catch (InterruptedException e) {log.error("提交异常:", e);}}/*** 单条记录处理* @param sourceRecordChangeEvent*/private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordChangeEvent) {SourceRecord sourceRecord = sourceRecordChangeEvent.record();log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());String tableName = sourceRecord.topic();// 获取对应的表处理类TableHandler handler = tableHandlerFactory.getHandler(tableName);handler.handle(sourceRecord);}@PostConstructprivate void start() {this.executor.execute(debeziumEngine);}@PreDestroyprivate void stop() {if (this.debeziumEngine != null) {try {this.debeziumEngine.close();} catch (IOException e) {log.error("关闭debeziumEngine异常:", e);}}this.executor.shutdown();}
}
11.测试
启动服务,修改数据库user表数据: