- 测试结论
源端增量获取方式包括:bulk、incrementing、timestamp、incrementing+timestamp(混合),各种方式说明如下:
bulk: 一次同步整个表的数据
incrementing: 使用严格的自增列标识增量数据。不支持对旧数据的更新和删除
timestamp: 使用时间戳标识增量数据,每次更新数据都要修改时间戳,时间戳严格递增
timestamp+incrementing: 使用两个列,一个为自增列,一个为时间戳列。综合incrementing和timestamp的功能
- 环境说明
本文在kafka的standalone模式下,适配kafka jdbc connector从源端mysql数据库实时同步数据到kadb中。验证1. 增量数据获取及增量数据获取方式
- kadb版本:V8R3
- mysql版本:5.7
- 操作系统:centos 7.6
- jdbc connector版本:10.8.3。下载地址:JDBC Connector (Source and Sink) | Confluent Hub: Apache Kafka Connectors for Streaming Data.
- mysql驱动:mysql-connector-java-5.1.39-bin.jar
- kadb驱动:postgresql-42.7.4.jar
- java版本:17.0.12 (kafka要求必须为17或者18版本,否则kafka安装报错)
- kafka版本:kafka_2.13-4.0.0
- kafka jdbc connector参考资料:
JDBC Source and Sink Connector for Confluent Platform | Confluent Documentation
- kafka connector参考资料
https://kafka.apache.org/documentation/
- 环境部署
- kafka部署
解压
tar -xzf kafka_2.13-4.0.0.tgz
cd kafka_2.13-4.0.0
产生集群UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
格式化日志目录
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
启动kafka
bin/kafka-server-start.sh config/server.properties
- jdbc connector部署
下载jdbc connector,将解压的内容保存到kafka解压目录的plugins下(plugins目录需自己创建内容如下:
[root@nanri plugins]# ls -l
total 8
drwxr-xr-x. 2 root root 43 Apr 17 21:50 assets
drwxr-xr-x. 3 root root 108 Apr 17 21:50 doc
drwxr-xr-x. 2 root root 90 Apr 17 21:50 etc
drwxr-xr-x. 2 root root 4096 Apr 17 21:50 lib
-rw-r--r--. 1 root root 2687 Apr 17 21:50 manifest.json
[root@nanri plugins]# pwd
/root/kafka_2.13-4.0.0/plugins
- 源端/目标端jdbc驱动
将源端mysql的jdbc驱动文件和目标端kadb驱动文件拷贝至kafka的解压目录的libs目录下:
[root@nanri libs]# ls -l mysql* postgres*
-rw-r--r--. 1 root root 989497 Apr 17 23:15 mysql-connector-java-5.1.39-bin.jar
-rw-r--r--. 1 root root 1086687 Apr 17 23:14 postgresql-42.7.4.jar
[root@nanri libs]# pwd
/root/kafka_2.13-4.0.0/libs
- 配置文件修改
- 连接器配置文件:connect-standalone.properties
添加插件路径参数:(绝对路径)
plugin.path=/root/kafka_2.13-4.0.0/plugins,/root/kafka_2.13-4.0.0/libs/connect-file-4.0.0.jar
- 源端配置文件:connect-mysql-source.properties文件内容,参数意义参考:
https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html
#productor名字
name=connect-mysql-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector //固定值,使用jdbc connector的类
# topic名称列表,源端和目标端的topic必须一致
topics=test
# 配置jdbc连接
connection.url=jdbc:mysql://192.168.85.145:3306/test_source?useUnicode=true&characterEncoding=utf8&user=root&password=Kingbase@1234&serverTimezone=Asia/Shanghai&useSSL=false
#增量获取方式,支持bulk,incrementing,timestamp等等
mode=incrementing
- 目标端配置文件:connect-kadb-sink.properties文件内容,参数意义参考:
https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html
#consumer名字
name=connect-kadb-sink
# 为当前connector创建的最大线程数
tasks.max=1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector //固定值,必须设置
# topic名称列表
topics=test
# 配置jdbc连接
connection.url=jdbc:postgresql://192.168.85.145:5432/test_sink
connection.user=mppadmin
# 自动创建表
auto.create=true
# 写入模式
insert.mode=insert
- 启动connect
bin/connect-standalone.sh
config/connect-standalone.properties //connect配置参数
config/connect-mysql-source.properties //源端配置参数
config/connect-kadb-sink.properties //目标端参数
- 测试
- mysql源端创建表,目标端会自动创建对应的表
mysql> desc test
-> ;
+-------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+----------------+
| a | int(11) | NO | PRI | NULL | auto_increment | //使用increment ing方式,必须是自增列
| b | varchar(10) | YES | | NULL | |
+-------+-------------+------+-----+---------+----------------+
2 rows in set (0.00 sec)
- 源端插入数据
mysql> insert into test(b) values('dddd');
Query OK, 1 row affected (0.00 sec)
- connect日志:
[2025-04-18 22:39:27,665] INFO [connect-kadb-sink|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Completed write operation for 1 records to the database (io.confluent.connect.jdbc.sink.JdbcDbWriter:100)
[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Successfully wrote 1 records. (io.confluent.connect.jdbc.sink.JdbcSinkTask:91)
[2025-04-18 22:39:32,637] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:32,641] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)
[2025-04-18 22:39:34,208] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 0 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)
[2025-04-18 22:39:37,642] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:37,644] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)
[2025-04-18 22:39:42,645] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:42,648] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)
[2025-04-18 22:39:44,210] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)
[2025-04-18 22:39:47,649] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:47,650] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)
[2025-04-18 22:39:52,653] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:52,657] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)
[2025-04-18 22:39:54,192] INFO Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
- 使用kafka-console-consumer.sh查看topic中的事件
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"a"},{"type":"string","optional":true,"field":"b"}],"optional":false,"name":"test"},"payload":{"a":5,"b":"dddd"}}
- 目标端数据
1 | aaa
2 | bbb
3 | ccc
4 | ddd
5 | dddd
(844 rows)
test_sink=#
- 源端数据
mysql> select * from test;
+---+------+
| a | b |
+---+------+
| 5 | dddd |
+---+------+
1 row in set (0.00 sec)
- 命令参考
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic sys_config
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic sys_config
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic sys_config --from-beginning
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 –list
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-local-file-sink –state
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic __consumer_offsets
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092