PostgreSQL10 逻辑复制实战:打造高可用数据同步架构!
概述
PostgreSQL 10 引入了逻辑复制(Logical Replication),为数据库高可用和数据同步提供了更灵活的选择。PostgreSQL 复制机制主要分为物理复制和逻辑复制两种:物理复制(又称流复制/物理块复制)在实例级别同步数据,而逻辑复制则支持更精细的复制粒度。逻辑复制通过逻辑解码插件解析 WAL 日志,提取 DML 语句并在订阅端执行,从而实现表级别的数据同步,适用于分库分表、实时数据同步、异构数据同步等高可用场景。
具备特性
逻辑复制过程中使用限制:
1. 不支持复制DDL。
2. 不支持复制序列、索引。
3. 不支持双向复制。
4. 发布节点和订阅节点表的模式名、表名必须一致,订阅节点允许表有额外字段。
逻辑复制与物理复制区别:
1. 物理复制不能垮操作系统(Linux-Windows),而逻辑复制可以。
2. 无法在不同的PG版本之间进行物理复制(例如10-12),逻辑复制可以支持。因此PostgreSQL大版本升级可以使用逻辑复制。
3. 物理复制是实例级别的复制,而逻辑复制可以基于对象级别(具体到某个表)。
4. 物理复制备库只能读,逻辑复制的备库可以写入。
应用场景
可基于表级别复制,是一种粒度可细的复制,主要用在以下场景
1. 满足业务上需求,实现某些指定表数据同步。
2. PostgreSQL 跨版本数据同步。
3. PostgreSQL 大版本升级。
具体流程
逻辑复制的流程图
PostgreSQL数据库逻辑复制使用发布者/订阅者模型,使用订阅复制槽技术,可并行的传输WAL日志,通过在订阅端回放WAL日志中的逻辑条目,保持复制表的数据同步,订阅端通过逻辑解码对数据进行REDO。
PUBLICATION对象
CREATE PUBLICATION 名称
[ FOR TABLE [ ONLY ] 表名 [ * ] [, ...]
| FOR ALL TABLES ]
[ WITH ( publication_parameter [= 值] [, ... ] ) ]
参数说明
FOR TABLE:表示要复制的表,可以通过’,’定义多个表。
FOR ALL TABLES:表示数据库的所有表都要复制。
WITH:表的DML操作行为,忽略表示全部DML操作。
一个PUBLICATION对象可以注册一个或多个表,也可以选择DML操作进行复制,一个表同时也可以被多个PUBLICATION注册。
SUBSCRIPTION对象
CREATE SUBSCRIPTION subscription_name
CONNECTION 'conninfo'
PUBLICATION publication_name [, ...]
[ WITH ( subscription_parameter [= 值] [, ... ] ) ]
参数说明
CONNECTION:连接master节点的字符串信息。eg. 'host=ip port=5432 user=xxx dbname=xxx'
PUBLICATION:对应发布端的PUBLICATION对象
WITH:表示DML操作,忽略表示全部DML操作
SUBSCRIPTION对象是逻辑复制过程汇总,由订阅节点创建的对象,用于连接发布节点的PUBLICATION对象。
逻辑解码
逻辑解码是使用一个输出插件将 Postgres 的预写日志 (WAL) 转换为可读格式。逻辑解码过程如图:
当 Postgres 数据库表中的一行发生更改时,该更改会记录在 WAL 中。如果启用了逻辑解码,则该更改的记录将传递给输出插件。输出插件将记录从 WAL 格式更改为插件的格式(例如 JSON 对象)。然后重新格式化的更改通过复制槽退出 Postgres。最后是消费者。消费者是您选择的任何连接到 Postgres 并接收逻辑解码输出的应用程序。
pgout插件解码wal后效果:
testdb=# select * from pg_logical_slot_get_changes('test', pg_current_wal_lsn(), 10); lsn | xid | data
------------+--------+------------------------------------------------------------------------------------------------------0/3DAE5178 | 377183 | BEGIN 3771830/3DAE5178 | 377183 | table public.users: UPDATE: id[character varying]:'4' name[character varying]:'anna' age[integer]:210/3DAE5348 | 377183 | COMMIT 3771830/3DAE65F0 | 377184 | BEGIN 3771840/3DAE65F0 | 377184 | table public.users: INSERT: id[character varying]:'5' name[character varying]:'5' age[integer]:550/3DAE6728 | 377184 | COMMIT 377184
(6 rows)
wal2json 插件可解码为json格式。
https://docs.microsoft.com/zh-cn/azure/postgresql/concepts-logical
使用示例
测试构建PostgreSQL10的逻辑复制环境。
角色 | 数据库 | 操作系统版本和数据库版本 | 复制用户 |
---|---|---|---|
发布节点:172.168.98.107 | testdb | Centos 7/ PostgreSQL 10 | replication |
订阅节点:172.168.98.115 | testdb | Centos 7/ PostgreSQL 10 | replication |
1、首先需要在发布角色节点设置 postgresql.conf 相关参数。
wal_level = logical
2、配置主和复制节点的pg_hba.conf文件配置replication用户连接不受限
host replication all 0.0.0.0/0 trust
3、主库和复制库上都创建replication角色并具有复制权限
[root@localhost ~]# su postgres
bash-4.2$ psql
postgres=# CREATE ROLE replication WITH replication PASSWORD '123456' LOGIN;
4、主库和复制库上都创建测试库和测试表
postgres=# CREATE USER testdb WITH ENCRYPTED PASSWORD 'testdb!123';
postgres=# CREATE DATABASE testdb OWNER testdb;
执行\c命令切换至testdb数据库
postgres=# \c testdb testdb
You are now connected to database "testdb" as user "testdb".
testdb=>
继续执行创建测试表
CREATE TABLE users (id varchar(10) NOT NULL,name varchar(35) NOT NULL,age integer
) ;
ALTER TABLE public.users ADD CONSTRAINT users_id_pkey PRIMARY KEY ("id");
主库插入测试数据
INSERT INTO public.users (id, name, age) VALUES ('1', 'zhangsan', 20);
INSERT INTO public.users (id, name, age) VALUES ('2', 'lisi', 30);
INSERT INTO public.users (id, name, age) VALUES ('3', 'wangwu', 21);
5、主库和复制库上都给replication用户授权数据库权限
testdb=> GRANT SELECT ON ALL tables IN SCHEMA PUBLIC TO replication;
6、在主库上创建发布并指定users表
testdb=> CREATE PUBLICATION testpub FOR TABLE users;
CREATE PUBLICATION
testdb=> \dRpList of publicationsName | Owner | All tables | Inserts | Updates | Deletes
---------+--------+------------+---------+---------+---------testpub | testdb | f | t | t | t
(1 row)testdb=>
7、在复制库上创建订阅
创建订阅,指定连接到主库上的发布。使用superuser来创建订阅,通过命令\c切换至postgres用户。
testdb=> \c testdb postgres
You are now connected to database "testdb" as user "postgres".
testdb=# CREATE SUBSCRIPTION testsub CONNECTION 'host=172.168.98.107 port=5432 dbname=testdb user=replication password=123456' PUBLICATION testpub;
NOTICE: created replication slot "testsub" on publisher
CREATE SUBSCRIPTION
创建订阅时可指定已经存在的slot
CREATE SUBSCRIPTION testsub CONNECTION 'host=172.168.98.107 port=5432 dbname=testdb user=replication password=123456' PUBLICATION testpub WITH (slot_name=test, create_slot=false);
复制库上面查看订阅情况
testdb=# \dRsList of subscriptionsName | Owner | Enabled | Publication
---------+----------+---------+-------------testsub | postgres | t | {testpub}
(1 row)
创建成功之后数据会自动复制过来。
testdb=# SELECT * FROM users;id | name | age
----+----------+-----1 | zhangsan | 202 | lisi | 303 | wangwu | 21(3 rows)
8、测试增删改
-- 主库插入记录
testdb=> INSERT INTO users VALUES('4','anna', 17);
INSERT 0 1-- 从库查询,记录'anna'已插入。
testdb=> select * from users;id | name | age
----+----------+-----1 | zhangsan | 202 | lisi | 303 | wangwu | 214 | anna | 17
(4 rows)-- 主库修改'anna'
testdb=> update users set age=18 where id='4';
-- 从库查询'anna'age已经同步修改...-- 主库删除'anna'
testdb=> delete from users where id='4';
-- 从库查询'anna'这行数据同步删除...
常用命令
查看当前数据库已有发布
SELECT * FROM pg_publication;
查看当前数据库已有订阅
SELECT * FROM pg_subscription;
删除发布
DROP PUBLICATION testpub;
删除订阅
DROP SUBSCRIPTION testsub;
禁用订阅
ALTER SUBSCRIPTION testsub disable;
启动订阅
ALTER SUBSCRIPTION testsub enable;
如果逻辑复制操作中一张表缺少主键,就需要执行这条语句,代表使用整行作为标识
ALTER TABLE table REPLICA IDENTITY FULL;
解除复制槽与订阅的关联
ALTER SUBSCRIPTION testsub disable;
ALTER SUBSCRIPTION testsub SET (slot_name = NONE);
DROP SUBSCRIPTION testsub;
显示当前服务的所有复制连接(发布端执行)
SELECT * FROM pg_stat_replication;
显示订阅者的状态信息
SELECT * FROM pg_stat_subscription;
显示所有复制槽(发布端执行)
SELECT * FROM pg_replication_slots;
创建复制槽
select pg_create_logical_replication_slot('test','test_decoding');
创建复制槽
SELECT pg_create_logical_replication_slot('sub_iuser', 'pgoutput');
在这个示例中:
- ‘sub_iuser’ 是要创建的复制槽的名称。
- ‘pgoutput’ 是指定的输出插件名称,它用于将逻辑复制的 WAL 记录转换为适合于逻辑复制的格式。
执行上述 SQL 命令后,将创建名为 sub_iuser 的逻辑复制槽。
请确保在创建复制槽之前,已经启用了逻辑复制,并且已经将逻辑复制参数配置为允许创建复制槽。
删除复制槽
SELECT * FROM pg_drop_replication_slot('test');
脚本实践
使用脚本发布订阅相关数据库和相关表
逻辑复制发布脚本
#!/bin/sh# DB密码
PASSWORD=PG_PWD
# 查询所有数据库
database_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "
SELECT d.datname AS database_name FROM pg_database d JOIN pg_user u ON d.datdba = u.usesysid WHERE u.usename = 'iuser';")
if [ -z "$database_name_result" ]; then# 查询结果为空exit 1
fiwhile IFS= read -r database_name; dodatabase_name=$(echo "$database_name" | sed 's/ //g')# 删除可能存在的发布、订阅关系PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP PUBLICATION IF EXISTS pub_$database_name;"PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP SUBSCRIPTION IF EXISTS sub_$database_name;"# 重新创建发布关系table_names=""# 主备需要排除表(例如排除日志表)table_names=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "SELECT string_agg(table_name, ',') AS table_namesFROM information_schema.tablesWHERE table_schema = 'public'AND table_type = 'BASE TABLE'AND table_name NOT LIKE 't_log%'AND table_name != 't_xxx';")# 获取要发布的表清单信息PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "CREATE PUBLICATION pub_$database_name FOR TABLE $table_names;"PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "SELECT * FROM pg_publication;"echo "PUBLICATION $database_name done."
echo "PUBLICATION done."
逻辑复制订阅脚本
注意:需要传参主库服务器ip和主库数据库密码。
#!/bin/sh
# 主机
MASTER_HOST=$1
# 主机数据库密码
MASTER_PASSWORD=$2
# 当前备机旧密码
PASSWORD=PG_PWD
# 查询iuser用户下的所有数据库
database_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "
SELECT d.datname AS database_name FROM pg_database d JOIN pg_user u ON d.datdba = u.usesysid WHERE u.usename = 'iuser';")
if [ -z "$database_name_result" ]; then
# 查询结果为空exit 1
fi
# 遍历每个数据库
while IFS= read -r database_name; dodatabase_name=$(echo "$database_name" | sed 's/ //g')# 删除存在的发布、订阅关系PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP PUBLICATION IF EXISTS pub_$database_name;"PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP SUBSCRIPTION IF EXISTS sub_$database_name;"# 尝试连接数据库echo "尝试连接主机数据库[$database_name]..."PGPASSWORD=$MASTER_PASSWORD psql -h $MASTER_HOST -p 5432 -U iuser -d $database_name -c "SELECT 1;"if [ $? -eq 0 ]; thenecho "主机数据库连接[$database_name]成功..."elseecho "主机数据库连接[$database_name]失败..."continuefi# 清空表数据# 主备需要排除表(例如排除日志表)table_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "SELECT table_nameFROM information_schema.tablesWHERE table_schema = 'public'AND table_type = 'BASE TABLE'AND table_name NOT LIKE 't_log%'AND table_name != 't_xxx';")while IFS= read -r table_name; doecho "Processing table: $table_name"# 清空表数据PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "TRUNCATE TABLE $table_name;"# 重新创建订阅关系PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "CREATE SUBSCRIPTION sub_$database_nameCONNECTION 'host=$MASTER_HOST port=5432 user=iuser password=$MASTER_PASSWORD dbname=$database_name'PUBLICATION pub_$database_name;"# 查询订阅情况echo "SELECT SUBSCRIPTION"PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "SELECT * FROM pg_subscription;"echo "SUBSCRIPTION $database_name done."
done <<< "$database_name_result"
echo "SUBSCRIPTION done."
逻辑复制停止脚本
#!/bin/sh
# 本机密码
PASSWORD=PG_PWD
# 查询iuser用户下的所有数据库
database_name_result=$(PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "
SELECT d.datname AS database_name FROM pg_database d JOIN pg_user u ON d.datdba = u.usesysid WHERE u.usename = 'iuser';")
# 检查查询结果是否为空
if [ -z "$database_name_result" ]; then# 查询结果为空exit 1
fi
# 遍历每个数据库
while IFS= read -r database_name; dodatabase_name=$(echo "$database_name" | sed 's/ //g')echo "DROP PUBLICATION&SUBSCRIPTION"# 删除之前可能存在的发布、订阅关系PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP PUBLICATION IF EXISTS pub_$database_name;"PGPASSWORD=$PASSWORD psql -h 127.0.0.1 -p 5432 -U iuser -t -c "DROP SUBSCRIPTION IF EXISTS sub_$database_name;"echo "DROP $database_name PUBLICATION&SUBSCRIPTION done."
done <<< "$database_name_result"
echo "PUBLICATION done."
延迟测试
100w测试
delete from users;
insert into users select generate_series(1,1000000), 'anna', 18;
delete from users;
> Affected rows: 1000000
> 时间: 2.024s
删除100w行需要时间2s,观测延迟
select * from users limit 1;
\watch 1 开始时间
2022年02月18日 星期五 18时18分03秒 (每 1s)id | name | age
----+------+-----1 | anna | 18
(1 行记录)
结束时间
2022年02月18日 星期五 18时18分25秒 (每 1s)id | name | age
----+------+-----
(0 行记录)
延迟20s左右
10w测试
insert into users select generate_series(1,100000), 'anna', 18;
delete from users;
> Affected rows: 100000
> 时间: 0.229s
删除10w行需要时间0.2s,观测延迟
select * from users limit 1;
\watch 1 开始时间
2022年02月18日 星期五 18时29分20秒 (每 1s)id | name | age
----+------+-----1 | anna | 18
(1 行记录)结束时间 2022年02月18日 星期五 18时29分22秒 (每 1s)id | name | age
----+------+-----
(0 行记录)
延迟2s左右,取决于一次性事务大小。
相关问题
逻辑复制配置双向复制WAL循环
-- 正向发布
CREATE TABLE t(a SERIAL, b CHAR);
create publication testpub1 FOR table t;
-- 正向订阅
CREATE TABLE t(a SERIAL, b CHAR);
create subscription testsub1 connection 'host=172.168.98.107 port=5432 dbname=testdb user=replication' publication testpub1;
-- 反向发布
create publication testpub2 FOR table t;
-- 反正订阅
create subscription testsub2 connection 'host=172.168.98.115 port=5432 dbname=testdb user=replication' publication testpub2;
至此已经创建了一个双向循环复制,如图所示。
此时我在发布端插入一条数据就会出现环绕现像。此时就会出现循环。不停的从A复制到B,再从B复制到A,直到把数据库搞崩。双向复制需要使用不同的表来实现。使用同样的表会产生WAL循环。
相关链接
1. PostgreSQL10官网文档
https://www.postgresql.org/docs/10/index.html
2. PostgreSQL10逻辑特性
https://www.postgresql.org/docs/10/logical-replication.html
3. 逻辑解码
https://docs.microsoft.com/zh-cn/azure/postgresql/concepts-logical
总结
逻辑复制是PostgreSQL10引入的重要特性,为数据库提供了更灵活的同步方式。在高可用架构中,逻辑复制可用于数据同步、灾备切换、实时分析等场景,提升数据库的可扩展性和业务连续性。合理规划依然可以打造稳定高效的高可用架构。💡 想让你的数据库更可靠?逻辑复制值得一试!🚀