1.创建springboot项目引入pom
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>1.4.2.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-postgres</artifactId><version>1.4.2.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>1.4.2.Final</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.43</version></dependency></dependencies>
2.application.properties配置
# Debezium Configuration
debezium.name=my-postgres-connector
debezium.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
debezium.offset.storage.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\offsets_0.dat
debezium.offset.flush.interval.ms=60000
debezium.database.hostname=192.168.159.103
debezium.database.port=15432
debezium.database.user=postgres
debezium.database.password=123456
debezium.database.dbname=db_test
debezium.database.server.id=12345
debezium.database.server.name=customer-postgres-db-server
debezium.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.database.history.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\history_0.dat
debezium.table.include.list=public.user
debezium.column.include.list=public.user.id,public.user.name
debezium.publication.autocreate.mode=filtered
debezium.plugin.name=pgoutput
debezium.slot.name=dbz_customerdb_listener
3.配置类:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.io.IOException;@Configuration
public class DebeziumConnectorConfig {@Beanpublic io.debezium.config.Configuration customerConnector(Environment env) throws IOException {return io.debezium.config.Configuration.create().with("name", env.getProperty("debezium.name")).with("connector.class", env.getProperty("debezium.connector.class")).with("offset.storage", env.getProperty("debezium.offset.storage")).with("offset.storage.file.filename", env.getProperty("debezium.offset.storage.file.filename")).with("offset.flush.interval.ms", env.getProperty("debezium.offset.flush.interval.ms")).with("database.hostname", env.getProperty("debezium.database.hostname")).with("database.port", env.getProperty("debezium.database.port")).with("database.user", env.getProperty("debezium.database.user")).with("database.password", env.getProperty("debezium.database.password")).with("database.dbname", env.getProperty("debezium.database.dbname")).with("database.server.id", env.getProperty("debezium.database.server.id")).with("database.server.name", env.getProperty("debezium.database.server.name"))//.with("database.history", "io.debezium.relational.history.MemoryDatabaseHistory").with("database.history", env.getProperty("debezium.database.history")).with("database.history.file.filename", env.getProperty("debezium.database.history.file.filename")).with("table.include.list", env.getProperty("debezium.table.include.list")) //表名.with("column.include.list", env.getProperty("debezium.column.include.list")) // 表中得哪些字段.with("publication.autocreate.mode", env.getProperty("debezium.publication.autocreate.mode")).with("plugin.name", env.getProperty("debezium.plugin.name")).with("slot.name", env.getProperty("debezium.slot.name")).build();}
}
4.注册监听
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;@Slf4j
@Component
public class DebeziumListener {private final Executor executor = Executors.newSingleThreadExecutor();private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;public DebeziumListener(Configuration customerConnectorConfiguration) {this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(customerConnectorConfiguration.asProperties()).notifying(this::handleChangeEvent).build();}private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());Struct sourceRecordChangeValue= (Struct) sourceRecord.value();//log.info("SourceRecordChangeValue = '{}'",sourceRecordRecordChangeEvent);if (sourceRecordChangeValue != null) {Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));// 处理非读操作if(operation != Envelope.Operation.READ) {String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;Struct struct = (Struct) sourceRecordChangeValue.get(record);Map<String, Object> payload = struct.schema().fields().stream().map(Field::name).filter(fieldName -> struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));// this.customerService.replicateData(payload, operation);log.info("Updated Data: {} with Operation: {}", payload, operation.name());}}}@PostConstructprivate void start() {this.executor.execute(debeziumEngine);}@PreDestroyprivate void stop() throws IOException {if (Objects.nonNull(this.debeziumEngine)) {this.debeziumEngine.close();}}}