kafka jdbc connector适配kadb数据实时同步

  • 测试结论

源端增量获取方式包括:bulk、incrementing、timestamp、incrementing+timestamp(混合),各种方式说明如下:

bulk: 一次同步整个表的数据

incrementing: 使用严格的自增列标识增量数据。不支持对旧数据的更新和删除

timestamp: 使用时间戳标识增量数据,每次更新数据都要修改时间戳,时间戳严格递增

timestamp+incrementing: 使用两个列,一个为自增列,一个为时间戳列。综合incrementing和timestamp的功能

  • 环境说明

本文在kafka的standalone模式下,适配kafka jdbc connector从源端mysql数据库实时同步数据到kadb中。验证1. 增量数据获取及增量数据获取方式

  1. kadb版本:V8R3
  2. mysql版本:5.7
  3. 操作系统:centos 7.6
  4. jdbc connector版本:10.8.3。下载地址:JDBC Connector (Source and Sink) | Confluent Hub: Apache Kafka Connectors for Streaming Data.
  5. mysql驱动:mysql-connector-java-5.1.39-bin.jar
  6. kadb驱动:postgresql-42.7.4.jar
  7. java版本:17.0.12 (kafka要求必须为17或者18版本,否则kafka安装报错)
  8. kafka版本:kafka_2.13-4.0.0
  9. kafka jdbc connector参考资料:

JDBC Source and Sink Connector for Confluent Platform | Confluent Documentation

  1. kafka connector参考资料

https://kafka.apache.org/documentation/

  • 环境部署
  1. kafka部署

解压

tar -xzf kafka_2.13-4.0.0.tgz

cd kafka_2.13-4.0.0

产生集群UUID

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

格式化日志目录

bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

启动kafka

bin/kafka-server-start.sh config/server.properties

  1. jdbc connector部署

下载jdbc connector,将解压的内容保存到kafka解压目录的plugins下(plugins目录需自己创建内容如下:

[root@nanri plugins]# ls -l

total 8

drwxr-xr-x. 2 root root   43 Apr 17 21:50 assets

drwxr-xr-x. 3 root root  108 Apr 17 21:50 doc

drwxr-xr-x. 2 root root   90 Apr 17 21:50 etc

drwxr-xr-x. 2 root root 4096 Apr 17 21:50 lib

-rw-r--r--. 1 root root 2687 Apr 17 21:50 manifest.json

[root@nanri plugins]# pwd

/root/kafka_2.13-4.0.0/plugins

  1. 源端/目标端jdbc驱动

将源端mysql的jdbc驱动文件和目标端kadb驱动文件拷贝至kafka的解压目录的libs目录下:

[root@nanri libs]# ls -l mysql* postgres*

-rw-r--r--. 1 root root  989497 Apr 17 23:15 mysql-connector-java-5.1.39-bin.jar

-rw-r--r--. 1 root root 1086687 Apr 17 23:14 postgresql-42.7.4.jar

[root@nanri libs]# pwd

/root/kafka_2.13-4.0.0/libs

  1. 配置文件修改
  1. 连接器配置文件:connect-standalone.properties

添加插件路径参数:(绝对路径)

plugin.path=/root/kafka_2.13-4.0.0/plugins,/root/kafka_2.13-4.0.0/libs/connect-file-4.0.0.jar

  1. 源端配置文件:connect-mysql-source.properties文件内容,参数意义参考:

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html

#productor名字

name=connect-mysql-source                

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector    //固定值,使用jdbc connector的类

# topic名称列表,源端和目标端的topic必须一致

topics=test

# 配置jdbc连接

connection.url=jdbc:mysql://192.168.85.145:3306/test_source?useUnicode=true&characterEncoding=utf8&user=root&password=Kingbase@1234&serverTimezone=Asia/Shanghai&useSSL=false

#增量获取方式,支持bulk,incrementing,timestamp等等

mode=incrementing

  1. 目标端配置文件:connect-kadb-sink.properties文件内容,参数意义参考:

https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html

#consumer名字

name=connect-kadb-sink

# 为当前connector创建的最大线程数

tasks.max=1

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector //固定值,必须设置

# topic名称列表

topics=test

# 配置jdbc连接

connection.url=jdbc:postgresql://192.168.85.145:5432/test_sink

connection.user=mppadmin

# 自动创建表

auto.create=true

# 写入模式

insert.mode=insert

  1. 启动connect

bin/connect-standalone.sh

config/connect-standalone.properties                //connect配置参数

config/connect-mysql-source.properties    //源端配置参数

config/connect-kadb-sink.properties            //目标端参数

  1. 测试
  1. mysql源端创建表,目标端会自动创建对应的表

mysql> desc test

    -> ;

+-------+-------------+------+-----+---------+----------------+

| Field | Type        | Null | Key | Default | Extra          |

+-------+-------------+------+-----+---------+----------------+

| a     | int(11)     | NO   | PRI | NULL    | auto_increment |    //使用increment ing方式,必须是自增列

| b     | varchar(10) | YES  |     | NULL    |                |

+-------+-------------+------+-----+---------+----------------+

2 rows in set (0.00 sec)

  1. 源端插入数据

mysql> insert into test(b) values('dddd');

Query OK, 1 row affected (0.00 sec)

  1. connect日志:

[2025-04-18 22:39:27,665] INFO [connect-kadb-sink|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Completed write operation for 1 records to the database (io.confluent.connect.jdbc.sink.JdbcDbWriter:100)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Successfully wrote 1 records. (io.confluent.connect.jdbc.sink.JdbcSinkTask:91)

[2025-04-18 22:39:32,637] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:32,641] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:34,208] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 0 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:37,642] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:37,644] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:42,645] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:42,648] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:44,210] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:47,649] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:47,650] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:52,653] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:52,657] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:54,192] INFO Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

  1. 使用kafka-console-consumer.sh查看topic中的事件

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"a"},{"type":"string","optional":true,"field":"b"}],"optional":false,"name":"test"},"payload":{"a":5,"b":"dddd"}}

  1. 目标端数据

1 | aaa

 2 | bbb

 3 | ccc

 4 | ddd

 5 | dddd

(844 rows)

test_sink=#

  1. 源端数据

mysql> select * from test;

+---+------+

| a | b    |

+---+------+

| 5 | dddd |

+---+------+

1 row in set (0.00 sec)

  1. 命令参考

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic sys_config

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic sys_config

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic sys_config --from-beginning

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 –list

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-local-file-sink –state

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic __consumer_offsets

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/77883.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Hadoop的音乐推荐系统(源码+lw+部署文档+讲解),源码可白嫖!

摘要 本毕业生数据分析与可视化系统采用B/S架构,数据库是MySQL,网站的搭建与开发采用了先进的Java语言、爬虫技术进行编写,使用了Spring Boot框架。该系统从两个对象:由管理员和用户来对系统进行设计构建。主要功能包括&#xff…

CentOS的安装以及网络配置

CentOS的下载 在学习docker之前,我们需要知道的就是docker是运行在Linux内核之上的,所以我们需要Linux环境的操作系统,当然了你也可以选择安装ubuntu等操作系统,如果你不想在本机安装的话还可以考虑买阿里或者华为的云服务器&…

【条形码识别改名工具】如何批量识别图片条形码,并以条码内容批量重命名,基于WPF和Zxing的开发总结

批量图片条形码识别与重命名系统 (WPF + ZXing)开发总结 项目适用场景 ​​电商商品管理​​:批量处理商品图片,根据条形码自动分类归档​​图书馆系统​​:扫描图书条形码快速建立电子档案​​医疗档案管理​​:通过药品条形码整理医疗图片资料​​仓储管理​​:自动化识…

RAGFlow安装+本地知识库+踩坑记录

RAGFlow是一种融合了数据检索与生成式模型的新型系统架构,其核心思想在于将大规模检索系统与先进的生成式模型(如Transformer、GPT系列)相结合,从而在回答查询时既能利用海量数据的知识库,又能生成符合上下文语义的自然…

android liveData observeForever 与 observe对比

LiveData 是一个非常有用的组件,用于在数据变化时通知观察者。LiveData 提供了两种主要的观察方法:observe 和 observeForever。这两种方法在使用场景、生命周期感知以及内存管理等方面有所不同。 一、observe 方法​​ ​​1. 基本介绍​​ ​​生命周期感知​​:observe…

web-ssrfme

一、题目源码 <?php highlight_file(__file__); function curl($url){ $ch curl_init();curl_setopt($ch, CURLOPT_URL, $url);curl_setopt($ch, CURLOPT_HEADER, 0);echo curl_exec($ch);curl_close($ch); }if(isset($_GET[url])){$url $_GET[url];if(preg_match(/file…

企业AI应用模式解析:从本地部署到混合架构

在人工智能快速发展的今天&#xff0c;企业如何选择合适的大模型应用方式成为了一个关键问题。本文将详细介绍六种主流的企业AI应用模式&#xff0c;帮助您根据自身需求做出最优选择。 1. 本地部署&#xff08;On-Premise Deployment&#xff09; 特点&#xff1a;将模型下载…

OpenCV 图形API(49)颜色空间转换-----将 NV12 格式的图像数据转换为 BGR 颜色空间函数NV12toBGR()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 将图像从NV12&#xff08;YUV420p&#xff09;颜色空间转换为BGR。 该函数将输入图像从NV12颜色空间转换为RGB。Y、U和V通道值的常规范围是0到25…

【java实现+4种变体完整例子】排序算法中【桶排序】的详细解析,包含基础实现、常见变体的完整代码示例,以及各变体的对比表格

以下是桶排序的详细解析&#xff0c;包含基础实现、常见变体的完整代码示例&#xff0c;以及各变体的对比表格&#xff1a; 一、桶排序基础实现 原理 将数据分到有限数量的桶中&#xff0c;每个桶内部使用其他排序算法&#xff08;如插入排序或快速排序&#xff09;&#xf…

Linux[基本指令]

Linux[基本指令] pwd 查看当前所处的工作目录 斜杠在Linux中作为路径分割符 路径存在的价值为了确定文件的唯一性 cd指令 更改路径 cd 你要去的路径(直接进入) cd . 当前目录 cd . . 上级目录(路径回退) 最后的’/为根目录(根节点) Linux还是window的目录结构都是树状…

git -- 对远程仓库的操作 -- 查看,添加(与clone对比),抓取和拉取,推送(注意点,抓取更新+合并的三种方法,解决冲突,对比),移除

目录 对远程仓库的操作 介绍 查看 (git remote) 介绍 查看详细信息 添加(git remote add) 介绍 与 git clone对比 从远程仓库中抓取与拉取 抓取(git fetch) 拉取(git pull) 推送(git push) 介绍 注意 抓取更新合并的方法 git fetch git merge 解决冲突 git …

vue3 excel文件导入

文章目录 前言使用在vue文件中的使用 前言 最近写小组官网涉及到了excel文件导入的功能 场景是导入小组成员年级 班级 邮箱 组别 姓名等基本信息的excel表格用于展示各组信息 使用 先下载js库 npm install xlsx为了提高代码的复用性 我将它写成了一个通用的函数 import ap…

Docker环境下SpringBoot程序内存溢出(OOM)问题深度解析与实战调优

文章目录 一、问题背景与现象还原**1. 业务背景****2. 故障特征****3. 核心痛点****4. 解决目标** 二、核心矛盾点分析**1. JVM 与容器内存协同失效****2. 非堆内存泄漏****3. 容器内存分配策略缺陷** 三、系统性解决方案**1. Docker 容器配置**2. JVM参数优化&#xff08;容器…

【PGCCC】Postgres MVCC 内部:更新与插入的隐性成本

为什么 Postgres 中的更新操作有时感觉比插入操作慢&#xff1f;答案在于 Postgres 如何在后台管理数据版本。 Postgres 高效处理并发事务能力的核心是多版本并发控制&#xff08;MVCC&#xff09;。 在本文中&#xff0c;我将探讨 MVCC 在 Postgres 中的工作原理以及它如何影响…

Docker使用、容器迁移

Docker 简介 Docker 是一个开源的容器化平台&#xff0c;用于打包、部署和运行应用程序及其依赖环境。Docker 容器是轻量级的虚拟化单元&#xff0c;运行在宿主机操作系统上&#xff0c;通过隔离机制&#xff08;如命名空间和控制组&#xff09;确保应用运行环境的一致性和可移…

c#清理释放内存

虽然c#具有内存管理和垃圾回收机制&#xff0c;但是在arcobjects二次开发嵌入到arcgis data reviewet还会报内存错误。需要强制清理某变量内存方法如下: 1设置静态函数ReleaseCom函数 public static void ReleaseCom(object o) { try{System.Runtime.InteropServices.Marsh…

Linux:进程:进程控制

进程创建 在Linux中我们使用fork函数创建新进程&#xff1a; fork函数 fork函数是Linux中的一个系统调用&#xff0c;用于创建一个新的进程&#xff0c;创建的新进程是原来进程的子进程 返回值&#xff1a;如果子进程创建失败&#xff0c;返回值是-1。如果子进程创建成功&a…

day1-小白学习JAVA---JDK安装和环境变量配置(mac版)

JDK安装和环境变量配置 我的电脑系统一、下载JDK1、oracle官网下载适合的JDK安装包&#xff0c;选择Mac OS对应的版本。 二、安装三、配置环境变量1、终端输入/usr/libexec/java_home -V查询所在的路径&#xff0c;复制备用2、输入ls -a3、检查文件目录中是否有.bash_profile文…

Python项目--基于机器学习的股票预测分析系统

1. 项目介绍 在当今数字化时代&#xff0c;金融市场的数据分析和预测已经成为投资决策的重要依据。本文将详细介绍一个基于Python的股票预测分析系统&#xff0c;该系统利用机器学习算法对历史股票数据进行分析&#xff0c;并预测未来股票价格走势&#xff0c;为投资者提供决策…

计算机视觉与深度学习 | 基于YOLOv8与光流法的目标检测与跟踪(Python代码)

===================================================== github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 ===================================================== 目标检测与跟踪 关键实现逻辑检测-跟踪协作机制‌特征点选择策略‌运动…