基于 HBase Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步

目录

一、总体架构

二、安装配置 MySQL

1. 创建 mysql 用户

2. 建立 MySQL 使用的目录

3. 解压安装包

4. 配置环境变量

5. 创建 MySQL 配置文件

6. MySQL 系统初始化

7. 启动 mysql 服务器

8. 创建 dba 用户

三、配置 MySQL 主从复制

四、安装部署 Kafka Connector

1. 创建插件目录

2. 解压文件到插件目录

3. 配置 Kafka Connector

(1)配置属性文件

(2)分发到其它节点

(3)以 distributed 方式启动

(4)确认 connector 插件和自动生成的 topic

4. 创建 source connector

(1)创建源 mysql 配置文件

(2)创建 mysql source connector

5. 创建 sink connector

(1)创建目标 hbase 配置文件

(2)创建 hbase sink connector

6. 存量数据自动同步

7. 实时数据同步测试

参考:


        本篇演示安装配置 Kafka connect 插件实现 MySQL 到 Hbase 的实时数据同步。依赖环境见本专栏前面文章。相关软件版本如下:

  • JDK:11.0.22
  • MySQL:8.0.16
  • HBase:2.5.7
  • debezium-connector-mysql:2.4.2
  • kafka-connect-hbase:2.0.13

一、总体架构

        总体结构如下图所示。

        下表描述了四个节点上分别将会运行的相关进程。简便起见,安装部署过程中所用的命令都使用操作系统的 root 用户执行。

                                             节点

进程

node1

node2

node3

node4

debezium-connector-mysql

*

*

*

kafka-connect-hbase

*

*

*

        另外在 172.18.16.156 上安装 MySQL,并启动两个实例做主从复制,主库实例用3306端口,从库实例用3307端口。

        所需安装包:

  • mysql-8.0.16-linux-glibc2.12-x86_64.tar.xz
  • debezium-debezium-connector-mysql-2.4.2.zip
  • confluentinc-kafka-connect-hbase-2.0.13.zip

        这里使用的 debezium connector 版本需要 JDK 11 以上支持。在安装了多个 JDK 版本的环境中,可以使用 alternatives 命令选择需要的版本:

[root@vvgg-z2-music-mysqld~]#alternatives --config java共有 5 个程序提供“java”。选择    命令
-----------------------------------------------1           /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java2           /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java+ 3           /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/bin/java4           /usr/lib/jvm/jre-1.5.0-gcj/bin/java
*  5           /usr/lib/jvm/jdk-11-oracle-x64/bin/java按 Enter 来保存当前选择[+],或键入选择号码:5
[root@vvgg-z2-music-mysqld~]#java -version
java version "11.0.22" 2024-01-16 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.22+9-LTS-219)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.22+9-LTS-219, mixed mode)
[root@vvgg-z2-music-mysqld~]#

        在 172.18.16.156 的 /etc/hosts 文件中加入 Kafka 集群主机名:

# 编辑文件
vim /etc/hosts

        添加以下内容:

172.18.4.126    node1
172.18.4.188    node2
172.18.4.71    node3
172.18.4.86    node4

二、安装配置 MySQL

        安装配置 MySQL 一主一从双实例。

1. 创建 mysql 用户

# root 用于执行
useradd mysql
passwd mysql

2. 建立 MySQL 使用的目录

# 创建数据目录,确保数据目录 mysqldata 为空
mkdir -p /data/3306/mysqldata# 创建 binlog 目录
mkdir -p /data/3306/dblog# 创建临时目录
mkdir -p /data/3306/tmp# 修改目录属主为 mysql
chown -R mysql:mysql /data# 使用 mysql 用户执行下面的安装过程
su - mysql

3. 解压安装包

# 进入安装目录
cd ~# 从tar包中把提取文件
tar xvf mysql-8.0.16-linux-glibc2.12-x86_64.tar.xz# 建立软连接
ln -s mysql-8.0.16-linux-glibc2.12-x86_64 mysql-8.0.16

4. 配置环境变量

# 将 MySQL 可执行文件所在目录添加到 $PATH 环境变量中
# 编辑文件
vim ~/.bash_profile# 修改或添加如下两行
PATH=$PATH:$HOME/.local/bin:$HOME/bin:/home/mysql/mysql-8.0.16/bin
export PATH# 使配置生效
source ~/.bash_profile

5. 创建 MySQL 配置文件

# 编辑文件
vim /home/mysql/my_3306.cnf

        文件内容如下:

[mysqld]
max_allowed_packet=1G
log_timestamps=system
binlog_transaction_dependency_tracking  = WRITESET
transaction_write_set_extraction        = XXHASH64binlog_expire_logs_seconds=259200
lower_case_table_names=1
secure_file_priv=''
log_bin_trust_function_creators=on
character-set-server = utf8mb4
default_authentication_plugin=mysql_native_password
basedir=/home/mysql/mysql-8.0.16-linux-glibc2.12-x86_64
datadir=/data/3306/mysqldata
socket=/data/3306/mysqldata/mysql.sockwait_timeout=30
innodb_buffer_pool_size = 16G
max_connections = 1000default-time-zone = '+8:00'port = 3306
skip-name-resolve 
user=mysqlinnodb_print_all_deadlocks=1
log_output='table'
slow_query_log = 1
long_query_time = 1tmp_table_size = 32M# 开启 binlog
log-bin=/data/3306/dblog/mysql-bin
log-bin-index = /data/3306/dblog/mysql-bin.index tmpdir = /data/3306/tmpserver-id = 1563306innodb_data_file_path = ibdata1:1G:autoextend
innodb_data_home_dir = /data/3306/mysqldatainnodb_log_buffer_size = 16M
innodb_log_file_size = 1G
innodb_log_files_in_group = 3
innodb_log_group_home_dir=/data/3306/dblog
innodb_max_dirty_pages_pct = 90
innodb_lock_wait_timeout = 120gtid-mode = on
enforce_gtid_consistency=truelocal_infile=0
log_error='/data/3306/mysqldata/master.err'
skip_symbolic_links=yes[mysqldump]
quick
max_allowed_packet = 1G[mysqld_safe]
open-files-limit = 8192

6. MySQL 系统初始化

mysqld --defaults-file=/home/mysql/my_3306.cnf --initialize

7. 启动 mysql 服务器

mysqld_safe --defaults-file=/home/mysql/my_3306.cnf &

8. 创建 dba 用户

# 连接 mysql 服务器
mysql -u root -p -S /data/3306/mysqldata/mysql.sock-- 修改 root 用户密码
alter user user() identified by "123456";-- 创建一个新的 dba 账号
create user 'dba'@'%' identified with mysql_native_password by '123456';
grant all on *.* to 'dba'@'%' with grant option;

        重复执行 2 - 8 步,将 3306 换成 3307,创建从库实例。

三、配置 MySQL 主从复制

        3306 主库实例执行:

-- 查看复制位点
show master status;
-- 创建复制用户并授权
create user 'repl'@'%' identified with mysql_native_password by '123456';
grant replication client,replication slave on *.* to 'repl'@'%';
-- 创建测试库表及数据
create database test;
create table test.t1 (id bigint(20) not null auto_increment,remark varchar(32) default null comment '备注',createtime timestamp not null default current_timestamp comment '创建时间',primary key (id));
insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');

        输出:

mysql> show master status;
+------------------+----------+--------------+------------------+------------------------------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set                        |
+------------------+----------+--------------+------------------+------------------------------------------+
| mysql-bin.000001 |      977 |              |                  | ba615057-e11c-11ee-b80e-246e961c91f8:1-3 |
+------------------+----------+--------------+------------------+------------------------------------------+
1 row in set (0.00 sec)mysql> create user 'repl'@'%' identified with mysql_native_password by '123456';
Query OK, 0 rows affected (0.01 sec)mysql> grant replication client,replication slave on *.* to 'repl'@'%';
Query OK, 0 rows affected (0.00 sec)mysql> create database test;
Query OK, 1 row affected (0.00 sec)mysql> create table test.t1 (->   id bigint(20) not null auto_increment,->   remark varchar(32) default null comment '备注',->   createtime timestamp not null default current_timestamp comment '创建时间',->   primary key (id));
Query OK, 0 rows affected (0.01 sec)mysql> insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0

        3307 从库实例执行:

change master to
master_host='172.18.16.156',
master_port=3306,
master_user='repl',
master_password='123456',
master_log_file='mysql-bin.000001',
master_log_pos=977;start slave;
show slave status\G
select user,host from mysql.user;
select * from test.t1;输出:
mysql> change master to-> master_host='172.18.16.156',-> master_port=3306,-> master_user='repl',-> master_password='123456',-> master_log_file='mysql-bin.000001',-> master_log_pos=977;
Query OK, 0 rows affected, 2 warnings (0.00 sec)mysql> start slave;
Query OK, 0 rows affected (0.01 sec)mysql> show slave status\G
*************************** 1. row ***************************Slave_IO_State: Waiting for master to send eventMaster_Host: 172.18.16.156Master_User: replMaster_Port: 3306Connect_Retry: 60Master_Log_File: mysql-bin.000001Read_Master_Log_Pos: 2431Relay_Log_File: vvgg-z2-music-mysqld-relay-bin.000002Relay_Log_Pos: 1776Relay_Master_Log_File: mysql-bin.000001Slave_IO_Running: YesSlave_SQL_Running: YesReplicate_Do_DB: Replicate_Ignore_DB: Replicate_Do_Table: Replicate_Ignore_Table: Replicate_Wild_Do_Table: Replicate_Wild_Ignore_Table: Last_Errno: 0Last_Error: Skip_Counter: 0Exec_Master_Log_Pos: 2431Relay_Log_Space: 1999Until_Condition: NoneUntil_Log_File: Until_Log_Pos: 0Master_SSL_Allowed: NoMaster_SSL_CA_File: Master_SSL_CA_Path: Master_SSL_Cert: Master_SSL_Cipher: Master_SSL_Key: Seconds_Behind_Master: 0
Master_SSL_Verify_Server_Cert: NoLast_IO_Errno: 0Last_IO_Error: Last_SQL_Errno: 0Last_SQL_Error: Replicate_Ignore_Server_Ids: Master_Server_Id: 1563306Master_UUID: ba615057-e11c-11ee-b80e-246e961c91f8Master_Info_File: mysql.slave_master_infoSQL_Delay: 0SQL_Remaining_Delay: NULLSlave_SQL_Running_State: Slave has read all relay log; waiting for more updatesMaster_Retry_Count: 86400Master_Bind: Last_IO_Error_Timestamp: Last_SQL_Error_Timestamp: Master_SSL_Crl: Master_SSL_Crlpath: Retrieved_Gtid_Set: ba615057-e11c-11ee-b80e-246e961c91f8:4-8Executed_Gtid_Set: ba615057-e11c-11ee-b80e-246e961c91f8:4-8,
c2df1946-e11c-11ee-8026-246e961c91f8:1-3Auto_Position: 0Replicate_Rewrite_DB: Channel_Name: Master_TLS_Version: Master_public_key_path: Get_master_public_key: 0Network_Namespace: 
1 row in set (0.00 sec)mysql> select user,host from mysql.user;
+------------------+-----------+
| user             | host      |
+------------------+-----------+
| dba              | %         |
| repl             | %         |
| mysql.infoschema | localhost |
| mysql.session    | localhost |
| mysql.sys        | localhost |
| root             | localhost |
+------------------+-----------+
6 rows in set (0.00 sec)mysql> select * from test.t1;
+----+------------------+---------------------+
| id | remark           | createtime          |
+----+------------------+---------------------+
|  1 | 第一行:row1     | 2024-03-20 10:25:32 |
|  2 | 第二行:row2     | 2024-03-20 10:25:32 |
|  3 | 第三行:row3     | 2024-03-20 10:25:32 |
+----+------------------+---------------------+
3 rows in set (0.00 sec)

        MySQL主从复制相关配置参见“配置异步复制”。

四、安装部署 Kafka Connector

        在 node2 上执行以下步骤。

1. 创建插件目录

mkdir $KAFKA_HOME/plugins

2. 解压文件到插件目录

# debezium-connector-mysql
unzip debezium-debezium-connector-mysql-2.4.2.zip -d $KAFKA_HOME/plugins/
# kafka-connect-hbase
unzip confluentinc-kafka-connect-hbase-2.0.13.zip -d $KAFKA_HOME/plugins/

3. 配置 Kafka Connector

(1)配置属性文件

# 编辑 connect-distributed.properties 文件
vim $KAFKA_HOME/config/connect-distributed.properties

        内容如下:

bootstrap.servers=node2:9092,node3:9092,node4:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3
offset.flush.interval.ms=10000
plugin.path=/root/kafka_2.13-3.7.0/plugins

(2)分发到其它节点

scp $KAFKA_HOME/config/connect-distributed.properties node3:$KAFKA_HOME/config/
scp $KAFKA_HOME/config/connect-distributed.properties node4:$KAFKA_HOME/config/
scp -r $KAFKA_HOME/plugins node3:$KAFKA_HOME/
scp -r $KAFKA_HOME/plugins node4:$KAFKA_HOME/

(3)以 distributed 方式启动

        三台都执行,在三个节点上各启动一个 worker 进程,用以容错和负载均衡。

connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties 
# 确认日志是否有 ERROR
grep ERROR ~/kafka_2.13-3.7.0/logs/connectDistributed.out

(4)确认 connector 插件和自动生成的 topic

        查看连接器插件:

curl -X GET http://node2:8083/connector-plugins | jq

        从输出中可以看到,Kafka connect 已经识别到了 hbase sink 和 mysql source 插件:

[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connector-plugins | jq% Total    % Received % Xferd  Average Speed   Time    Time     Time  CurrentDload  Upload   Total   Spent    Left  Speed
100   494  100   494    0     0   4111      0 --:--:-- --:--:-- --:--:--  4116
[{"class": "io.confluent.connect.hbase.HBaseSinkConnector","type": "sink","version": "2.0.13"},{"class": "io.debezium.connector.mysql.MySqlConnector","type": "source","version": "2.4.2.Final"},{"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type": "source","version": "3.7.0"},{"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type": "source","version": "3.7.0"},{"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector","type": "source","version": "3.7.0"}
]
[root@vvml-yz-hbase-test~]#

        查看 topic:

kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

        从输出中可以看到,Kafka connect 启动时自动创建了 connect-configs、connect-offsets、connect-status 三个 topic:

[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
test-1
test-3
[root@vvml-yz-hbase-test~]#

4. 创建 source connector

(1)创建源 mysql 配置文件

# 编辑文件
vim $KAFKA_HOME/plugins/source-mysql.json

        内容如下:

{"name": "mysql-source-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","topic.prefix": "mysql-hbase-test","database.hostname": "172.18.16.156","database.port": "3307","database.user": "dba","database.password": "123456","database.server.id": "1563307","database.server.name": "dbserver1","database.include.list": "test","schema.history.internal.kafka.bootstrap.servers": "node2:9092,node3:9092,node4:9092","schema.history.internal.kafka.topic": "schemahistory.mysql-hbase-test"}}

(2)创建 mysql source connector

# 创建 connector
curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"
# 查看 connector 状态
curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
# 查看 topic
kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

        从输出中可以看到,mysql-source-connector 状态为 RUNNING,并自动创建了三个 topic:

[root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"
HTTP/1.1 201 Created
Date: Wed, 20 Mar 2024 02:31:30 GMT
Location: http://node2:8083/connectors/mysql-source-connector
Content-Type: application/json
Content-Length: 579
Server: Jetty(9.4.53.v20231009){"name":"mysql-source-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","topic.prefix":"mysql-hbase-test","database.hostname":"172.18.16.156","database.port":"3307","database.user":"dba","database.password":"123456","database.server.id":"1563307","database.server.name":"dbserver1","database.include.list":"test","schema.history.internal.kafka.bootstrap.servers":"node2:9092,node3:9092,node4:9092","schema.history.internal.kafka.topic":"schemahistory.mysql-hbase-test","name":"mysql-source-connector"},"tasks":[],"type":"source"}
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq% Total    % Received % Xferd  Average Speed   Time    Time     Time  CurrentDload  Upload   Total   Spent    Left  Speed
100   182  100   182    0     0  20726      0 --:--:-- --:--:-- --:--:-- 22750
{"name": "mysql-source-connector","connector": {"state": "RUNNING","worker_id": "172.18.4.188:8083"},"tasks": [{"id": 0,"state": "RUNNING","worker_id": "172.18.4.188:8083"}],"type": "source"
}
[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
mysql-hbase-test
mysql-hbase-test.test.t1
schemahistory.mysql-hbase-test
test-1
test-3
[root@vvml-yz-hbase-test~]#

5. 创建 sink connector

(1)创建目标 hbase 配置文件

# 编辑文件
vim $KAFKA_HOME/plugins/sink-hbase.json

        内容如下:

{"name": "hbase-sink-connector","config": {"topics": "mysql-hbase-test.test.t1","tasks.max": "1","connector.class": "io.confluent.connect.hbase.HBaseSinkConnector","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","confluent.topic.bootstrap.servers": "node2:9092,node3:9092,node4:9092","confluent.topic.replication.factor":3,"hbase.zookeeper.quorum": "node2,node3,node4","hbase.zookeeper.property.clientPort": "2181","auto.create.tables": "true","auto.create.column.families": "true","table.name.format": "example_table"}
}

(2)创建 hbase sink connector

# 创建 connector
curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/sink-hbase.json"
# 查看 connector 状态
curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq
# 查看 consumer group
kafka-consumer-groups.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

        从输出中可以看到,hbase-sink-connector 状态为 RUNNING,并自动创建了一个消费者组:

[root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/sink-hbase.json"
HTTP/1.1 201 Created
Date: Wed, 20 Mar 2024 02:33:11 GMT
Location: http://node2:8083/connectors/hbase-sink-connector
Content-Type: application/json
Content-Length: 654
Server: Jetty(9.4.53.v20231009){"name":"hbase-sink-connector","config":{"topics":"mysql-hbase-test.test.t1","tasks.max":"1","connector.class":"io.confluent.connect.hbase.HBaseSinkConnector","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","confluent.topic.bootstrap.servers":"node2:9092,node3:9092,node4:9092","confluent.topic.replication.factor":"3","hbase.zookeeper.quorum":"node2,node3,node4","hbase.zookeeper.property.clientPort":"2181","auto.create.tables":"true","auto.create.column.families":"true","table.name.format":"example_table","name":"hbase-sink-connector"},"tasks":[],"type":"sink"}
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq% Total    % Received % Xferd  Average Speed   Time    Time     Time  CurrentDload  Upload   Total   Spent    Left  Speed
100   176  100   176    0     0  23084      0 --:--:-- --:--:-- --:--:-- 25142
{"name": "hbase-sink-connector","connector": {"state": "RUNNING","worker_id": "172.18.4.71:8083"},"tasks": [{"id": 0,"state": "RUNNING","worker_id": "172.18.4.71:8083"}],"type": "sink"
}
[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
connect-hbase-sink-connector
[root@vvml-yz-hbase-test~]#

6. 存量数据自动同步

        sink connector 自动在 hbase 中创建了 example_table 表,并且自动同步了前面配置 MySQL 主从复制时添加的三条测试数据:

[root@vvml-yz-hbase-test~]#hbase shell
HBase Shell
Use "help" to get list of supported commands.
Use "exit" to quit this interactive shell.
For Reference, please visit: http://hbase.apache.org/2.0/book.html#shell
Version 2.5.7-hadoop3, r6788f98356dd70b4a7ff766ea7a8298e022e7b95, Thu Dec 14 16:16:10 PST 2023
Took 0.0012 seconds                                                                                                                       
hbase:001:0> list
TABLE                                                                                                                                     
SYSTEM:CATALOG                                                                                                                            
SYSTEM:CHILD_LINK                                                                                                                         
SYSTEM:FUNCTION                                                                                                                           
SYSTEM:LOG                                                                                                                                
SYSTEM:MUTEX                                                                                                                              
SYSTEM:SEQUENCE                                                                                                                           
SYSTEM:STATS                                                                                                                              
SYSTEM:TASK                                                                                                                               
example_table                                                                                                                             
test                                                                                                                                      
10 row(s)
Took 0.3686 seconds                                                                                                                       
=> ["SYSTEM:CATALOG", "SYSTEM:CHILD_LINK", "SYSTEM:FUNCTION", "SYSTEM:LOG", "SYSTEM:MUTEX", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:TASK", "example_table", "test"]
hbase:002:0> describe 'example_table'
Table example_table is ENABLED                                                                                                            
example_table, {TABLE_ATTRIBUTES => {METADATA => {'hbase.store.file-tracker.impl' => 'DEFAULT'}}}                                         
COLUMN FAMILIES DESCRIPTION                                                                                                               
{NAME => 'mysql-hbase-test.test.t1', INDEX_BLOCK_ENCODING => 'NONE', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING =
> 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', COMPRESSION => 'NON
E', BLOCKCACHE => 'true', BLOCKSIZE => '65536 B (64KB)'}                                                                                  1 row(s)
Quota is disabled
Took 0.1173 seconds                                                                                                                       
hbase:003:0> scan 'example_table',{FORMATTER=>'toString'}
ROW                                 COLUMN+CELL                                                                                           {"id":1}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.587, value={"before":null,"after":{"id":1,"remark":"第一行:row1","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Final","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"first","db":"test","sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710901892115,"transaction":null}                         {"id":2}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.593, value={"before":null,"after":{"id":2,"remark":"第二行:row2","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Final","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"true","db":"test","sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          {"id":3}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.596, value={"before":null,"after":{"id":3,"remark":"第三行:row3","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Final","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"last","db":"test","sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          
3 row(s)
Took 0.0702 seconds                                                                                                                       
hbase:004:0> 

        debezium-connector-mysql 默认会在启动时将存量数据写到 Kafka 中,这使得在构建实时数仓时,可以做到存量数据与增量数据一步实时同步,极大方便了 CDC(Change Data Capture,变化数据捕获) 过程。

7. 实时数据同步测试

        MySQL 主库数据变更:

insert into test.t1 (remark) values ('第四行:row4');
update test.t1 set remark = '第五行:row5' where id = 4;
delete from test.t1 where id =1;

        Hbase 查看数据变化:

hbase:004:0> scan 'example_table',{FORMATTER=>'toString'}
ROW                                 COLUMN+CELL                                                                                           {"id":1}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.587, value={"before":null,"after":{"id":1,"remark":"第一行:row1","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Final","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"first","db":"test","sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710901892115,"transaction":null}                         {"id":2}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.593, value={"before":null,"after":{"id":2,"remark":"第二行:row2","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Final","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"true","db":"test","sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          {"id":3}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.596, value={"before":null,"after":{"id":3,"remark":"第三行:row3","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Final","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"last","db":"test","sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          {"id":4}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:38:18.788, value={"before":null,"after":{"id":4,"remark":"第四行:row4","createtime":"2024-03-20T02:38:18Z"},"source":{"version":"2.4.2.Final","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710902298000,"snapshot":"false","db":"test","sequence":null,"table":"t1","server_id":1563306,"gtid":"ba615057-e11c-11ee-b80e-246e961c91f8:9","file":"mysql-bin.000001","pos":2679,"row":0,"thread":49,"query":null},"op":"c","ts_ms":1710902298665,"transaction":null}                                                                                       
4 row(s)
Took 0.0091 seconds                                                                                                                       
hbase:005:0> 

        MySQL 执行的 delete、update 操作没有同步到 Hbase。

        查看消费情况:

[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --group connect-hbase-sink-connector --describe --bootstrap-server node2:9092,node3:9092,node4:9092Consumer group 'connect-hbase-sink-connector' has no active members.GROUP                        TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
connect-hbase-sink-connector mysql-hbase-test.test.t1 0          3               7               4               -               -               -
[root@vvml-yz-hbase-test~]#

        数据变更都写入了 Kafka,但没有都消费。

        查看 sink connector 状态:

[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq% Total    % Received % Xferd  Average Speed   Time    Time     Time  CurrentDload  Upload   Total   Spent    Left  Speed
100  2168  100  2168    0     0   368k      0 --:--:-- --:--:-- --:--:--  423k
{"name": "hbase-sink-connector","connector": {"state": "RUNNING","worker_id": "172.18.4.71:8083"},"tasks": [{"id": 0,"state": "FAILED","worker_id": "172.18.4.71:8083","trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: \n\tat io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)\n\tat io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)\n\tat io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)\n\tat io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)\n\tat io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.\n\t... 16 more\n"}],"type": "sink"
}
[root@vvml-yz-hbase-test~]#

        查看 node3 上的日志文件 ~/kafka_2.13-3.7.0/logs/connectDistributed.out,错误信息如下:

[2024-03-20 10:38:18,794] ERROR [hbase-sink-connector|task-0] WorkerSinkTask{id=hbase-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table:  (org.apache.kafka.connect.runtime.WorkerSinkTask:630)
org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: at io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)at io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)at io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.... 16 more
[2024-03-20 10:38:18,794] ERROR [hbase-sink-connector|task-0] WorkerSinkTask{id=hbase-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: at io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)at io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)at io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)... 11 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.... 16 more

        可以看到报错为:Row with specified row key already exists.

        原因是 sink connector 将 MySQL 的 update、delete 都转化为 Hbase 数据插入,但自动识别的 rowkey 为 MySQL 表的主键,而该 rowkey 已经存在,所以插入报错了。这种同步行为需要注意。

参考:

  • Greenplum 实时数据仓库实践(5)——实时数据同步
  • Debezium MySQL Source Connector for Confluent Platform
  • Apache HBase Sink Connector for Confluent Platform

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/757667.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

23.python标准库之turtle库

一、窗体函数 turtle.setup(width, height, startx, starty) width:窗口宽度 height:窗口高度 startx:窗口与屏幕左侧距离(单位象素) starty:窗口与屏幕顶部距离(单位象素) 二、画笔状态函数 三、画笔运动函数

Java字符串精通之旅:从新手到专家

目录 一、字符串的创建 1.直接赋值 2.使用构造方法 二、字符串不可变性 三、常用操作 1.字符串长度 2.连接字符串 3.格式化字符串 四、示例代码:String类应用 五、String中常用的方法 在Java编程世界里,字符串无疑是最常用的数据类型之一。不论…

想要把PDF文件转TXT文本编辑改动怎么办?三秒钟帮你搞定 PDF编辑器

pdf是一种便携文件格式,是由Adobe公司所开发的独特的跨平台文件格式。PDF文件以PostScript语言图象模型为基础,无论在哪种打印机上都可保证精确的颜色和准确的打印效果,即PDF会忠实地再现原稿的每一个字符、颜色以及图象。有点遗憾的是&#…

【好用】Star超36.8k,一个的免费通用数据库管理工具

关于数据库管理工具,大家可能都在用SQLyog、Navicat、MySQL-Front、SQL Studio、MySQL Workbench等等,这些管理工具不是不好用,就是要变魔术才可以用,今天 V 哥给大家推荐一个即好用,又免费的可视化通用数据库管理工具…

Git 删除.git 目录

Git 删除.git 目录 如上图,删除项目中的.git目录 ①在windows系统上,进入该项目的根目录 ②在根目录下打开Git bash ③使用rm -rf .git命令递归删除.git目录 rm -rf .git在删除后想重新初始化,可以参考下面链接里的内容 Git初始化及远程推送…

腾讯云优惠券领取的几种方法,助你降低云服务成本

腾讯云优惠券领取的几种方法,助你降低云服务成本 一、最新优惠卷二、最新活动 腾讯云—腾讯倾力打造的云计算品牌,以卓越科技能力助力各行各业数字化转型,为全球客户提供领先的云计算、大数据、人工智能服务,以及定制化行业解决方…

刷题日记:面试经典 150 题 DAY6

刷题日记:面试经典 150 题 DAY6 392. 判断子序列167. 两数之和 II - 输入有序数组11. 盛最多水的容器15. 三数之和209. 长度最小的子数组 392. 判断子序列 原题链接 392. 判断子序列 双指针,i指向s,j指向t 如果s[i]t[j],则匹配…

JDK1.8超详细安装教程

1、下载jdk1.8 大家可以直接去百度云盘下载: 链接:https://pan.baidu.com/s/187N6CU9Gu4bjtOz5_cjd-A?pwd3535 提取码:35352、开始安装 双击下载好的.exe文件,点击下一步 修改安装路径,点击下一步 会顺带安装jre…

107 在携带请求体的情况下, hutool 将 get 请求转换为了 post 请求

前言 本问题主要是来自于同事 情况大致如下, 同样的代码 一个是测试用例, 一个是生产环境的应用, 访问同一个第三方服务, 参数什么的完全一致 但是 出现的问题就是 测试用例能够拿到正确的对方的响应, 但是 生产环境的应用 却是拿到的对方的报错 然后 我开始以为是 是否…

【CKA模拟题】学会JSONPath,精准定位Pod信息!

题干 For this question, please set this context (In exam, diff cluster name) kubectl config use-context kubernetes-adminkubernetesyou have a script named pod-filter.sh . Update this script to include a command that filters and displays the label with the…

安卓Android入门

安卓作为日常生活中不可缺少的移动操作系统,在5G的发展和应用过程中发挥着其重要的作用。 5G是第五代移动通信技术,拥有更快的速度、更高的带宽、更低的延迟和更大的连接密度。这一技术的快速发展为移动设备和应用提供了更多的可能性。 安卓和5G的关系…

Qt 多元素控件

Qt开发 多元素控件 Qt 中提供的多元素控件有: QListWidgetQListViewQTableWidgetQTableViewQTreeWidgetQTreeView xxWidget 和 xxView 之间的区别 以 QTableWidget 和 QTableView 为例. QTableView 是基于 MVC 设计的控件. QTableView 自身不持有数据. 使用QTableView 的 …

OSPF特殊区域(stub\nssa)

stub区域——只有1类、2类、3类;完全stub区域——只有1类、2类 NSSA区域:本区域将自己引入的外部路由发布给其他区域,但不需要接收其他区域的路由 在NSSA区域的路由器上,引入外部路由时,不会转换成5类LSA&#xff0c…

HarmonyOS系统开发ArkTS常用组件切换按钮及参数

Toggle为切换按钮组件,一般用于两种状态之间的切换,例如下图中的蓝牙开关。 Toggle组件的参数:Toggle(options: { type: ToggleType, isOn?: boolean }) type属性用于设置Toggle组件的类型isOn属性用于设置Toggle组件的状态selectedColor()…

51-31 CVPR’24 | VastGaussian,3D高斯大型场景重建

2024 年 2 月,清华大学、华为和中科院联合发布的 VastGaussian 模型,实现了基于 3D Gaussian Splatting 进行大型场景高保真重建和实时渲染。 Abstract 现有基于NeRF大型场景重建方法,往往在视觉质量和渲染速度方面存在局限性。虽然最近 3D…

docker入门(四)—— docker常用命令详解

docker 常用命令 基本命令 # 查看 docker 版本 docker version # 查看一些 docker 的详细信息 docker info 帮助命令(–help),linux必须要会看帮助文档 docker --help[rootiZbp15293q8kgzhur7n6kvZ /]# docker --helpUsage: docker [OPTI…

【C语言】结构体类型名、变量名以及typedef

文章目录 分类判断结构体成员的使用typedef 分类判断 struct tag {char m;int i; }p;假设定义了上面这一个结构体,tag 就是类型名, p 就是变量名, m 和 i 就是结构体成员列表。 可以这么记,括号前面的是类型名,括号后…

【vue核心技术实战精讲】1.1 Vue开篇介绍 + 1.2 Vue的起步 和 插值表达式

文章目录 准备开始适应人群vue 框架学习路线一、vue 基础1、历史介绍2、前端框架与库的区别? 二、vue的起步 和 插值表达式Stage 1:下载包,并放入项目中Stage 2:编码Stage 3:源码 与 效果 准备开始 适应人群 有一定的HTML/CSS/…

Android仿智联详情

很久没有发文章了,这一年多一直在卷。最近由于疫情的原因,很多公司都在给员工发“毕业证”,我也很荣幸拿到了“毕业证”。不知道是应该开心还是桑心,北京最近因为疫情基本都居家办公,而我也开始了做简历,刷…

OpenCV C++ 图像处理实战 ——《物体数量计数与尺寸测量》

OpenCV C++ 图像处理实战 ——《物体数量计数与尺寸测量》 一、结果演示二、多尺度自适应Gamma矫正的低照度图像增强三、轮廓计算与尺寸测量3.1 图像二值化3.2 轮廓提取3.3 物体计数与尺寸测量四、源码测试图像下载总结一、结果演示 二、多尺度自