Springboot集成Debezium监听postgresql变更

在这里插入图片描述

1.创建springboot项目引入pom

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>1.4.2.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-postgres</artifactId><version>1.4.2.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>1.4.2.Final</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.43</version></dependency></dependencies>

2.application.properties配置

# Debezium Configuration
debezium.name=my-postgres-connector
debezium.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
debezium.offset.storage.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\offsets_0.dat
debezium.offset.flush.interval.ms=60000
debezium.database.hostname=192.168.159.103
debezium.database.port=15432
debezium.database.user=postgres
debezium.database.password=123456
debezium.database.dbname=db_test
debezium.database.server.id=12345
debezium.database.server.name=customer-postgres-db-server
debezium.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.database.history.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\history_0.dat
debezium.table.include.list=public.user
debezium.column.include.list=public.user.id,public.user.name
debezium.publication.autocreate.mode=filtered
debezium.plugin.name=pgoutput
debezium.slot.name=dbz_customerdb_listener

3.配置类:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.io.IOException;@Configuration
public class DebeziumConnectorConfig {@Beanpublic io.debezium.config.Configuration customerConnector(Environment env) throws IOException {return io.debezium.config.Configuration.create().with("name", env.getProperty("debezium.name")).with("connector.class", env.getProperty("debezium.connector.class")).with("offset.storage", env.getProperty("debezium.offset.storage")).with("offset.storage.file.filename", env.getProperty("debezium.offset.storage.file.filename")).with("offset.flush.interval.ms", env.getProperty("debezium.offset.flush.interval.ms")).with("database.hostname", env.getProperty("debezium.database.hostname")).with("database.port", env.getProperty("debezium.database.port")).with("database.user", env.getProperty("debezium.database.user")).with("database.password", env.getProperty("debezium.database.password")).with("database.dbname", env.getProperty("debezium.database.dbname")).with("database.server.id", env.getProperty("debezium.database.server.id")).with("database.server.name", env.getProperty("debezium.database.server.name"))//.with("database.history", "io.debezium.relational.history.MemoryDatabaseHistory").with("database.history", env.getProperty("debezium.database.history")).with("database.history.file.filename", env.getProperty("debezium.database.history.file.filename")).with("table.include.list", env.getProperty("debezium.table.include.list")) //表名.with("column.include.list", env.getProperty("debezium.column.include.list")) // 表中得哪些字段.with("publication.autocreate.mode", env.getProperty("debezium.publication.autocreate.mode")).with("plugin.name", env.getProperty("debezium.plugin.name")).with("slot.name", env.getProperty("debezium.slot.name")).build();}
}

4.注册监听

import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;@Slf4j
@Component
public class DebeziumListener {private final Executor executor = Executors.newSingleThreadExecutor();private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;public DebeziumListener(Configuration customerConnectorConfiguration) {this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(customerConnectorConfiguration.asProperties()).notifying(this::handleChangeEvent).build();}private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());Struct sourceRecordChangeValue= (Struct) sourceRecord.value();//log.info("SourceRecordChangeValue = '{}'",sourceRecordRecordChangeEvent);if (sourceRecordChangeValue != null) {Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));// 处理非读操作if(operation != Envelope.Operation.READ) {String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;Struct struct = (Struct) sourceRecordChangeValue.get(record);Map<String, Object> payload = struct.schema().fields().stream().map(Field::name).filter(fieldName -> struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));// this.customerService.replicateData(payload, operation);log.info("Updated Data: {} with Operation: {}", payload, operation.name());}}}@PostConstructprivate void start() {this.executor.execute(debeziumEngine);}@PreDestroyprivate void stop() throws IOException {if (Objects.nonNull(this.debeziumEngine)) {this.debeziumEngine.close();}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/74517.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

报错 standard_init_linux.go:228: exec user process caused: exec format error

docker logs 容器名 报错&#xff1a; standard_init_linux.go:228: exec user process caused: exec format error 或者 standard_init_linux.go:228: exec user process caused: input/output error 排查思路 1、检查源镜像的框架是否正确&#xff0c;是否amd64&#x…

Go 代理爬虫

现在注册&#xff0c;还送15美金注册奖励金 --- 亮数据-网络IP代理及全网数据一站式服务商 使用代理服务器&#xff0c;通过 Colly、Goquery、Selenium 进行网络爬虫的基础示例程序 本仓库包含两个分支&#xff1a; basic 分支包含供 Go Proxy Servers 这篇文章改动的基础代码…

STM32实现智能温控系统(暖手宝):PID 算法 + DS18B20+OLED 显示,[学习 PID 优质项目]

一、项目概述 本文基于 STM32F103C8T6 单片机&#xff0c;设计了一个高精度温度控制系统。通过 DS18B20 采集温度&#xff0c;采用位置型 PID 算法控制 PWM 输出驱动 MOS 管加热Pi膜&#xff0c;配合 OLED 实时显示温度数据。系统可稳定将 PI 膜加热至 40℃&#xff0c;适用于…

neo4j知识图谱常用命令

1. 查看所有节点和关系 如果你想查看图数据库中的所有节点和关系&#xff0c;可以使用以下查询&#xff1a; Cypher 深色版本 MATCH (n)-[r]->(m) RETURN n, r, m n 和 m 表示节点。r 表示两个节点之间的关系。这条命令会返回所有节点及其直接相连的关系。 2. 查看所有节…

从零开始:使用Luatools工具高效烧录Air780EPM核心板项目的完整指南

本文将深入讲解如何使用Luatools工具烧录一个具体的项目到Air780EPM开发板中。如何使用官方推荐的Luatools工具&#xff08;一款跨平台、命令行驱动的烧录利器&#xff09;&#xff0c;通过“环境配置→硬件连接→参数设置→一键烧录”四大步骤&#xff0c;帮助用户实现Air780E…

2024年认证杯SPSSPRO杯数学建模C题(第二阶段)云中的海盐全过程文档及程序

2024年认证杯SPSSPRO杯数学建模 C题 云中的海盐 原题再现&#xff1a; 巴黎气候协定提出的目标是&#xff1a;在2100年前&#xff0c;把全球平均气温相对于工业革命以前的气温升幅控制在不超过2摄氏度的水平&#xff0c;并为1.5摄氏度而努力。但事实上&#xff0c;许多之前的…

大疆上云api介绍

概述 目前对于 DJI 无人机接入第三方云平台,主要是基于 MSDK 开发定制 App,然后自己定义私有上云通信协议连接到云平台中。这样对于核心业务是开发云平台,无人机只是其中一个接入硬件设备的开发者来说,重新基于 MSDK 开发 App 工作量大、成本高,同时还需要花很多精力在无人…

云原生之开源遥测框架OpenTelemetry(在 Gin 框架中使用 OpenTelemetry 进行分布式追踪和监控)

文章目录 云原生之开源遥测框架OpenTelemetry背景什么是可观测性&#xff1f; 什么是 OpenTelemetry&#xff1f;Opentelemetry的主要优势有以下几点&#xff1a;理解分布式链路日志Spans分布式链路 在 Gin 框架中使用 OpenTelemetry 进行分布式追踪和监控0. 整体思路1. 初始化…

【蓝桥杯速成】| 11.回溯 之 子集问题

题目一&#xff1a;子集 问题描述 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 给你一个整数数组 nums &#xff0c;数组中的元素 互不相同 。返回该数组所有可能的子集&#xff08;幂集&#xff09;。 解集 不能 包含重复的子集。你可以按 任意顺序 返回解集。 示例…

Nginx目录结构

Nginx目录结构 ​ Nginx 的安装目录结构可能会因安装方式&#xff08;如使用包管理器、源码编译等&#xff09;和操作系统的不同而有所差异。以下是通过在线安装时&#xff0c;Nginx 默认的目录结构&#xff0c;以及各目录和文件的作用。 yum install nginx查询nginx [rootRo…

2.(vue3.x+vite)使用vue-router

前端技术社区总目录(订阅之前请先查看该博客) 效果预览 路由配置的“/”与“helloWorld”都可以访问到以下内容 http://10.11.0.87:4000/#/ http://10.11.0.87:4000/#/helloWorld 1:安装vue-router npm i vue-router 2:创建router文件 在src的目录下创建router文件夹…

后端返回了 xlsx 文件流,前端怎么下载处理

当后端返回一个 .xlsx 文件流时&#xff0c;前端可以通过 JavaScript 处理这个文件流并触发浏览器下载。 实现步骤 发送请求获取文件流&#xff1a; 使用 fetch 或 axios 等工具向后端发送请求&#xff0c;确保响应类型设置为 blob&#xff08;二进制数据流&#xff09;。 创建…

HTML5拖拽功能教程

HTML5拖拽功能教程 简介 HTML5引入了原生拖放(Drag and Drop)API&#xff0c;使开发者能够轻松实现网页中的拖拽功能&#xff0c;无需依赖第三方库。拖拽功能可以大大提升用户体验&#xff0c;适用于文件上传、列表排序、看板系统等多种交互场景。本教程将带您全面了解HTML拖…

VUE3 路由配置

1.下载 VueRouter 模块 在命令行中输入 yarn add vue-router 2.导⼊相关函数 在自己创建的router/index.js 文件中 import { createRouter, createWebHashHistory } from vue-router 3.创建路由实例 在自己创建的router/index.js 文件中 const theFirstRouter ()>{return…

历史序列影像 Esri的World Imagery Wayback简介

Esri的World Imagery Wayback是一个专注于提供历史卫星影像的在线平台&#xff0c;由全球领先的地理信息系统&#xff08;GIS&#xff09;技术提供商Esri开发。该平台整合了多源卫星影像数据&#xff0c;允许用户回溯特定区域在不同时间点的影像变化&#xff0c;支持时间序列分…

golang结构体与指针类型

结构体与指针类型 指针类型字段 具名字段 举例 package struct_knowledgeimport "fmt"//结构体字段为指针类型 func StructWithPoint(){type Student struct{name *string}var lisa Studentfmt.Printf("赋值前,Student的实例的值%#v\n",lisa)//错误的赋…

NetMizer-日志管理系统-远程命令执行漏洞挖掘

漏洞描述&#xff1a;NetMizer 日志管理系统 cmd.php中存在远程命令执行漏洞&#xff0c;攻击者通过传入 cmd参数即可命令执行 1.fofa搜素语句 title"NetMizer 日志管理系统" 2.漏洞验证 网站页面 验证POC /data/manage/cmd.php?cmdid

Contactile三轴触觉传感器:多维力感赋能机器人抓取

在非结构化环境中&#xff0c;机器人对物体的精准抓取与操作始终面临巨大挑战。传统传感器因无法全面感知触觉参数&#xff08;如三维力、位移、摩擦&#xff09;&#xff0c;难以适应复杂多变的场景。Contactile推出的三轴触觉力传感器&#xff0c;通过仿生设计与创新光学技术…

OpenCV三维解算常用方法C++

如果标定过程是通过OpenCV张正友标定法实现的&#xff0c;得到的内参外参保存在.txt文件中是这样的形式&#xff1a; ① 内参intrinsics.txt&#xff1a; ② 外参extrinsics.txt&#xff1a; 那么可以通过如下方法读取.txt文件获取左右相机内外参&#xff0c;主要包括三维解算…

栈和队列相关知识题目

栈的底层原理 栈&#xff08;Stack&#xff09;是一种后进先出&#xff08;LIFO&#xff09;​的线性数据结构&#xff0c;所有操作&#xff08;如插入、删除&#xff09;仅在栈顶进行。它的底层实现可以是数组或链表&#xff0c;具体取决于编程语言和应用场景。 1.基于数组实…