问题现象-起库缓慢
版本pg13.2
数据库启动缓慢,startup进程在读取spill文件,文件名在变化。查看spill文件也很慢,ls -l最后跑出来有800w个文件spill文件。
为什么有上千万个spill文件
wal段和LSN的含义
LSN
LSN总体是一个64位的bigint,LSN实际长这样42D3B/1732C540
(hex),斜杠/
前是32位逻辑日志号,/
后32位是段号+块号+块内偏移。这4个部分分别是:
32位 | 8位 | 11位 | 13位 |
---|---|---|---|
逻辑日志号 | 日志段号 | 块号 | 块内偏移 |
块内偏移 8192=2^13
块号=16M(默认wal段大小)/8192
wal segment
wal文件名由3组16进制数字组成。
以8k的wal文件0000000300042D3B00000002
为例:
32位 | 32位 | 32位 |
---|---|---|
timeline | 逻辑日志号 | 日志段号 |
00000003 | 00042D3B | 00000002 |
可以看出LSN可以定位到wal文件名及文件中offset位置。
其中,LSN斜杠/
前是逻辑日志号,斜杠/
后8位的日志段号下面都会用到。
spill文件名转换
复制槽名:logical_ex2209_rep
spill文件名:xid-407989064-lsn-42D1E-20000000.spill
42D1E不是一个完整的LSN,不能直接用pg_walfile_name
来定位wal文件名。42D1E是一个逻辑日志号,如果直接过滤文件名含42D1E的wal文件,可以找到16个wal文件。
能否通过数字20000000定位到wal日志段号从而定位到哪一个文件呢?
spill文件名:
/** Given a replication slot, transaction ID and segment number, fill in the* corresponding spill file into 'path', which is a caller-owned buffer of size* at least MAXPGPATH.*/
static void
ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,XLogSegNo segno)
{XLogRecPtr recptr;XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",NameStr(MyReplicationSlot->data.name),xid,(uint32) (recptr >> 32), (uint32) recptr);
}
pg_replslot/%s和xid-%u-lsn比较好理解,就是复制槽名称和xid。后面的recptr
还要再看下定义:
/** Pointer to a location in the XLOG. These pointers are 64 bits wide,* because we don't want them ever to overflow.*/
typedef uint64 XLogRecPtr;
XLogSegNoOffsetToRecPtr
是通过wal日志段号、段大小、offset计算LSN:
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest) \(dest) = (segno) * (wal_segsz_bytes) + (offset)
XLogRecPtr就是LSN!那么
(uint32) (recptr >> 32)
表示取LSN前32位,(uint32) recptr)
取LSN后32位。
LSN前32位也就是刚才看到的LSN前半段,lsn-42D1E;LSN后32位实际上信息多了,这里只要LSN后32位中的前几位的段号即可。
因为传入的offset=0,也传入了segno,那么根本不需要wal日志段内偏移量信息,就可以计算出dest的值。wal_segsz_bytes的真实值是128M*1014*1024,将XLogSegNoOffsetToRecPtr
中的式子转化下为:
segno= dest/(128*1024*1024)
-- 再把16进制20000000转化下
segno= x'20000000'::int/(128*1024*1024)
segno= 4
可以从式子算出日志段号segno,也就可以定位到wal文件号了。
所以,这里的spill文件名:xid-407989064-lsn-42D1E-20000000.spill对应的wal文件为
逻辑日志号=42D1E,段号=04:
ls 42D1E*04
0000000200042D1E00000004
pg_waldump可以看到xid 407989064在里面。
实际上wal大小在实例创建后也是固定的,即(128*1024*1024)是一个常量,那么segno跟(uint32) recptr绝对相关,但不相等。也就是说切换一个wal日志就会切换一个spill。
最后总结spill文件生成规则如下:
- 同一个事务id,如果跨wal就会产生多个spill。如:一个不含子事务的大事务跨越3个wal,就会对应3个spill文件
- 不同的事务id对应不同的spill。如:1000w个子事务对应1000w个spill
spill文件名结构xid-407989064-lsn-42D1E-20000000.spill:
xid | lsn前32位;即wal逻辑日志号 | 由wal日志段号换算;不等于段号 |
---|---|---|
xid-407989064 | lsn-42D1E | 20000000 |
# 恢复出的环境[postgres]$ ll |head -100
total 40000276
-rw------- 1 postgres postgres 184 Dec 6 15:20 state
-rw------- 1 postgres postgres 196 Dec 6 13:25 xid-407989064-lsn-42D1E-0.spill
-rw------- 1 postgres postgres 208 Dec 6 13:25 xid-407989064-lsn-42D1E-20000000.spill
...
-rw------- 1 postgres postgres 540 Dec 6 16:44 xid-407989064-lsn-42D2A-D0000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989065-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989066-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989068-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989070-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989072-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989076-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989079-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989080-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989082-lsn-42D1D-C8000000.spill[postgres@lzlhost /myhost/liuzhilong/pg_replslot/logical_ex9e15_rep]$ ll |awk '{print $9}'|awk -F '-' '{print $2}'|sort|uniq -c|wc -l
10000003
[postgres@lzlhost /myhost/liuzhilong/pg_replslot/logical_ex9e15_rep]$ ll |wc -l
10000070
所以我们在实际环境中看到了10000070个文件,文件的distinct xid有10000003个,也就是说1个父事务跨越约70个wal文件,这个父事务有1000w个子事务。
复制槽溢出测试
--发布订阅搭建复制链路
logical_decoding_work_mem = 64MB #pg_ctl reload
wal_segment_size =128 MB--source
CREATE TABLE replication_table (id BIGSERIAL PRIMARY KEY,column1 char(2000),column2 char(2000),column3 char(2000)
);
create publication pub_test for table replication_table ;--dest
CREATE TABLE replication_table (id BIGSERIAL PRIMARY KEY,column1 char(2000),column2 char(2000),column3 char(2000)
);CREATE SUBSCRIPTION sub_test
CONNECTION 'host=127.0.0.1 port=8094 dbname=lzl user=lzl password=qwer'
PUBLICATION pub_test;--source
select * from pg_replication_slots;
大事务、无子事务、复制表溢出测试
--创建一个大事务暂时不提交
begin;
insert into replication_table(column1,column2,column3)
select 'a','b','c' from generate_series(1,1000000) g;--复制槽溢出ll
total 331924
-rw------- 1 postgres postgres 184 Dec 9 20:22 state
-rw------- 1 postgres postgres 88226964 Dec 9 20:22 xid-5074343-lsn-163-38000000.spill
-rw------- 1 postgres postgres 119698488 Dec 9 20:22 xid-5074343-lsn-163-40000000.spill
大事务提交后,等待消费直至复制链路延迟为0,spill文件消失
M=# select pid,usename,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,reply_time from pg_stat_replication;pid | usename | sent_lsn | write_lsn | flush_lsn | replay_lsn | write_lag | flush_lag | replay_lag | reply_time
--------+---------+--------------+--------------+--------------+--------------+-----------+-----------+------------+------------------------------163525 | lzl | 163/4996E1C8 | 163/4996E1C8 | 163/4996E1C8 | 163/4996E1C8 | [null] | [null] | [null] | 2024-12-09 20:25:35.14769+08
(1 row)M=# select pid,usename,pg_wal_lsn_diff(pg_current_wal_lsn(),sent_lsn) diff_sent_mb,pg_wal_lsn_diff(pg_current_wal_lsn(),write_lsn) diff_write_mb,pg_wal_lsn_diff(pg_current_wal_lsn(),flush_lsn) diff_flush_mb,pg_wal_lsn_diff(pg_current_wal_lsn(),replay_lsn) diff_replay_mb,pg_walfile_name_offset(sent_lsn) sentoffset,pg_walfile_name_offset(write_lsn) writeoffset,pg_walfile_name_offset(flush_lsn) flush_lsn from pg_stat_replication;pid | usename | diff_sent_mb | diff_write_mb | diff_flush_mb | diff_replay_mb | sentoffset | writeoffset | flush_lsn
--------+---------+--------------+---------------+---------------+----------------+-------------------------------------+-------------------------------------+-------------------------------163525 | lzl | 0 | 0 | 0 | 0 | (000000010000016300000009,26665416) | (000000010000016300000009,26665416) | (000000[/mypg/pg8094/data/pg_replslot/sub_test]$ ll
total 357392
-rw------- 1 postgres postgres 184 Dec 9 20:23 state
-rw------- 1 postgres postgres 88226964 Dec 9 20:22 xid-5074343-lsn-163-38000000.spill
-rw------- 1 postgres postgres 137696328 Dec 9 20:23 xid-5074343-lsn-163-40000000.spill
-rw------- 1 postgres postgres 26076708 Dec 9 20:23 xid-5074343-lsn-163-48000000.spill
[/mypg/pg8094/data/pg_replslot/sub_test]$ ll
total 4
-rw------- 1 postgres postgres 184 Dec 9 20:25 state2666
(1 row)
大事务、无子事务、非复制表溢出测试
--source 创建一个不相干的表,准备写入数据
CREATE TABLE no_replication_table (id BIGSERIAL PRIMARY KEY,column1 char(2000),column2 char(2000),column3 char(2000)
);--创建一个大事务暂时不提交
begin;
insert into no_replication_table(column1,column2,column3)
select 'a','b','c' from generate_series(1,1000000) g;--溢出
[postgres@lzldb:MYINST:8094 /mypg/pg8094/data/pg_replslot/sub_test]$ ll
total 357492
-rw------- 1 postgres postgres 184 Dec 9 20:09 state
-rw------- 1 postgres postgres 107511456 Dec 9 20:08 xid-5074106-lsn-163-28000000.spill
-rw------- 1 postgres postgres 137698804 Dec 9 20:09 xid-5074106-lsn-163-30000000.spill
-rw------- 1 postgres postgres 4308444 Dec 9 20:09 xid-5074106-lsn-163-38000000.spill
大事务、子事务、非复制表溢出测试
# 一次insert一行,每个insert一个子事务
echo "begin;">subtx.sql
for i in {1..1000000}
doecho "savepoint p$i;">>subtx.sqlecho "insert into no_replication_table(column1,column2,column3) select 'a','b','c';">>subtx.sql
donenohup psql -d lzl -f subtx.sql &
#执行过程中,观察到溢出80w+文件
[/myhost/pg8094/data/pg_replslot/sub_test]$ ll |wc -l
823749
[/myhost/pg8094/data/pg_replslot/sub_test]$ ll |head -10
total 1099532
-rw------- 1 postgres postgres 184 Dec 9 21:10 state
-rw------- 1 postgres postgres 1236 Dec 9 21:10 xid-5519686-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519687-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519688-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519689-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519690-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519691-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519692-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519693-lsn-163-70000000.spill
数据库启动慢分析
startup进程起库流程分析
这里以堆栈编号逐栈解析起库流程:
11:main
:没啥好说的
10:PostmasterMain
:
在主循环前,会先调用起库流程 StartupPID = StartupDataBase();
本质上是调用StartChildProcess(StartupProcess)
#define StartupDataBase() StartChildProcess(StartupProcess)
9:StartChildProcess
:fork一个进程。该进程为启动postmaster的辅助进程,正常的子进程启动都走这个逻辑,在这一步fork。这里的入参AuxProcType
=StartupProcess
8:AuxiliaryProcessMain
:
因为MyAuxProcType
=StartupProcess,所以走的是StartupProcessMain
流程,这不同于walsender,walwrite,bgwriter这些子进程的流程。startup进程本身是为了宕机恢复读wal的进程,但是它还做了很多事情
switch (MyAuxProcType){case CheckerProcess:/* don't set signals, they're useless here */CheckerModeMain();proc_exit(1); /* should never return */case BootstrapProcess:/** There was a brief instant during which mode was Normal; this is* okay. We need to be in bootstrap mode during BootStrapXLOG for* the sake of multixact initialization.*/SetProcessingMode(BootstrapProcessing);bootstrap_signals();BootStrapXLOG();BootstrapModeMain();proc_exit(1); /* should never return */case StartupProcess: //这里这里这里这里/* don't set signals, startup process has its own agenda */StartupProcessMain();proc_exit(1); /* should never return */case BgWriterProcess:/* don't set signals, bgwriter has its own agenda */BackgroundWriterMain();proc_exit(1); /* should never return */case CheckpointerProcess:/* don't set signals, checkpointer has its own agenda */CheckpointerMain();proc_exit(1); /* should never return */case WalWriterProcess:/* don't set signals, walwriter has its own agenda */InitXLOGAccess();WalWriterMain();proc_exit(1); /* should never return */case WalReceiverProcess:/* don't set signals, walreceiver has its own agenda */WalReceiverMain();proc_exit(1); /* should never return */default:elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType);proc_exit(1);}
7:StartupProcessMain
:主要是为了调用StartupXLOG()
6:StartupXLOG
:
函数注释:
This must be called ONCE during postmaster or standalone-backend startup
StartupXLOG
无论怎样都会被postmaster调用,无论是否是崩溃停库还是一致性停库
switch (ControlFile->state){...case DB_IN_PRODUCTION:ereport(LOG,(errmsg("database system was interrupted; last known up at %s",str_time(ControlFile->time))));break;
这跟log日志能对上,以下是log的停库起库输出:
2024-12-06 17:02:57.534 CST,,,447560,,65693cde.6d448,1325,,2023-12-01 09:54:38 CST,,0,LOG,00000,"database system is shut down",,,,,,,,,"","postmaster"
2024-12-06 17:03:49.536 CST,,,211844,,6752bdf3.33b84,1,,2024-12-06 17:03:47 CST,,0,LOG,00000,"ending log output to stderr",,"Future log output will go to log destination ""csvlog"".",,,,,,,"","postmaster"
2024-12-06 17:03:49.536 CST,,,211844,,6752bdf3.33b84,2,,2024-12-06 17:03:47 CST,,0,LOG,00000,"starting PostgreSQL 13.2 (RaseSQL 1.3) on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-39.0.1), 64-bit",,,,,,,,,"","postmaster"
2024-12-06 17:03:49.537 CST,,,211844,,6752bdf3.33b84,3,,2024-12-06 17:03:47 CST,,0,LOG,00000,"listening on IPv4 address ""0.0.0.0"", port 7284",,,,,,,,,"","postmaster"
2024-12-06 17:03:49.539 CST,,,211844,,6752bdf3.33b84,4,,2024-12-06 17:03:47 CST,,0,LOG,00000,"listening on Unix socket ""/tmp/.s.PGSQL.7284""",,,,,,,,,"","postmaster"
2024-12-06 17:03:49.557 CST,,,211995,,6752bdf5.33c1b,1,,2024-12-06 17:03:49 CST,,0,LOG,00000,"database system was interrupted; last known up at 2024-12-06 17:00:10 CST",,,,,,,,,"","startup"
所以,当时停库后,控制文件记录的数据库状态为in production
Database cluster state: in production
in production
这个状态是数据库正在运行,而不是正常的shutdown状态,说明当时数据库停库时不是一致性停库。
继续其中关于fsync的关键代码:
/*----------* If we previously crashed, perform a couple of actions:** - The pg_wal directory may still include some temporary WAL segments* used when creating a new segment, so perform some clean up to not* bloat this path. This is done first as there is no point to sync* this temporary data.** - There might be data which we had written, intending to fsync it, but* which we had not actually fsync'd yet. Therefore, a power failure in* the near future might cause earlier unflushed writes to be lost, even* though more recent data written to disk from here on would be* persisted. To avoid that, fsync the entire data directory.*/if (ControlFile->state != DB_SHUTDOWNED &&ControlFile->state != DB_SHUTDOWNED_IN_RECOVERY){RemoveTempXlogFiles();SyncDataDirectory();}
这里因为控制文件记录的状态不是正常停库的,所以走到if中调用SyncDataDirectory()
做fsync持久化。
StartupXLOG
做了很多很多事,其中跟spill相关的除了SyncDataDirectory()
还有StartupReorderBuffer()
:
/** Initialize replication slots, before there's a chance to remove* required resources.*/StartupReplicationSlots();/** Startup logical state, needs to be setup now so we have proper data* during crash recovery.*/StartupReorderBuffer();
StartupReorderBuffer
虽然也会被调用,它会调用ReorderBufferCleanupSerializedTXNs
清理所有slot目录的spill文件(不是删除目录和state文件)
/** Delete all data spilled to disk after we've restarted/crashed. It will be* recreated when the respective slots are reused.*/
void
StartupReorderBuffer(void)
{DIR *logical_dir;struct dirent *logical_de;logical_dir = AllocateDir("pg_replslot");while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL){if (strcmp(logical_de->d_name, ".") == 0 ||strcmp(logical_de->d_name, "..") == 0)continue;/* if it cannot be a slot, skip the directory */if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))continue;/** ok, has to be a surviving logical slot, iterate and delete* everything starting with xid-**/ReorderBufferCleanupSerializedTXNs(logical_de->d_name);}FreeDir(logical_dir);
}
5:SyncDataDirectory
:
这段函数注释非常重要:
/** Issue fsync recursively on PGDATA and all its contents.** We fsync regular files and directories wherever they are, but we* follow symlinks only for pg_wal and immediately under pg_tblspc.* Other symlinks are presumed to point at files we're not responsible* for fsyncing, and might not have privileges to write at all.** Errors are logged but not considered fatal; that's because this is used* only during database startup, to deal with the possibility that there are* issued-but-unsynced writes pending against the data directory. We want to* ensure that such writes reach disk before anything that's done in the new* run. However, aborting on error would result in failure to start for* harmless cases such as read-only files in the data directory, and that's* not good either.** Note that if we previously crashed due to a PANIC on fsync(), we'll be* rewriting all changes again during recovery.** Note we assume we're chdir'd into PGDATA to begin with.*/
- fsync所有data目录文件使之持久化
- 这个动作只会发生在起库阶段
- 这个动作是为了保证在数据库运行前data目录是完全持久化的
SyncDataDirectory
主体是递归遍历目录并fsync(link文件稍微特殊处理一下):
walkdir(".", datadir_fsync_fname, false, LOG);if (xlog_is_symlink)walkdir("pg_wal", datadir_fsync_fname, false, LOG);walkdir("pg_tblspc", datadir_fsync_fname, true, LOG);
4:walkdir
:递归到.
3:walkdir
:递归到./pg_replslot
2:walkdir
:递归到./pg_replslot/slotname
1:lstat
:C库调用。walkdir
不仅要做fsync(入参函数datadir_fsync_fname
),walkdir
函数本体还要做 lstat
获取文件信息,如inode、文件大小、最近修改时间等等,类似linux的stat
命令。
0:_lxstat
:C库调用
起库逻辑汇总:
- pg会启动一个辅助进程
startup
以协助起库,不同于在常见的childprocess(walwriter、bgwriter、checkpointer等等)进程,它是起库过程中必定会启动的进程,它会做很多事情 StartupXLOG
起库时一定会被调用,无论数据库是否一致性停库- 只有非正常停库状态下,才会触发
SyncDataDirectory
SyncDataDirectory
会fsync持久化所有data文件,并查看所有data文件的stat信息- fsync是为了在库启动前保证data文件都一致;stat应该是为了验证文件是否正常和可读(在startup进程启动前只验证过datadir目录可读性)
- 无论停库状态,
StartupReorderBuffer
一定会被调用并清理所有复制槽的spill文件
什么时候是ready状态
startup进程把活干完后数据库还不是ready状态,在pmState状态机改变状态时会调用reaper
回收进程函数。reaper函数本身是为了子进程退出后进行一些回收或者启动工作。pmState状态机记录状态为PM_STARTUP,状态机是控制启停库状态的。
PostmasterMain
的最后几步:
StartupPID = StartupDataBase();Assert(StartupPID != 0);StartupStatus = STARTUP_RUNNING;pmState = PM_STARTUP; //状态机改变状态/* Some workers may be scheduled to start now */maybe_start_bgworkers();status = ServerLoop();/** ServerLoop probably shouldn't ever return, but if it does, close down.*/ExitPostmaster(status != STATUS_OK);abort(); /* not reached */
}
PostmasterMain
起库的核心流程会走到reaper
以处理startup进程的正常退出,
PMState注释:
/** We use a simple state machine to control startup, shutdown, and* crash recovery (which is rather like shutdown followed by startup).** After doing all the postmaster initialization work, we enter PM_STARTUP* state and the startup process is launched. The startup process begins by* reading the control file and other preliminary initialization steps.* In a normal startup, or after crash recovery, the startup process exits* with exit code 0 and we switch to PM_RUN state.
PMState会被信号传递和处理,startup进程退出后reaper
会被激活以回收进程。
reaper
函数处理startup子进程的正常退出态:
if (pid == StartupPID){StartupPID = 0;
.../** Startup succeeded, commence normal operations*/StartupStatus = STARTUP_NOT_RUNNING; //由STARTUP_RUNNING转成STARTUP_NOT_RUNNINGFatalError = false; //上面一堆if未命中后,才不是fatal的AbortStartTime = 0;ReachedNormalRunning = true;pmState = PM_RUN; //状态机由PM_STARTUP转成PM_RUNconnsAllowed = ALLOW_ALL_CONNS;/** Crank up the background tasks, if we didn't do that already* when we entered consistent recovery state. It doesn't matter* if this fails, we'll just try again later.*///以下都在启动核心子进程if (CheckpointerPID == 0)CheckpointerPID = StartCheckpointer();if (BgWriterPID == 0)BgWriterPID = StartBackgroundWriter();if (WalWriterPID == 0)WalWriterPID = StartWalWriter();/** Likewise, start other special children as needed. In a restart* situation, some of them may be alive already.*///以下都在启动非核心子进程if (!IsBinaryUpgrade && AutoVacuumingActive() && AutoVacPID == 0)AutoVacPID = StartAutoVacLauncher();if (PgArchStartupAllowed() && PgArchPID == 0)PgArchPID = pgarch_start();if (PgStatPID == 0)PgStatPID = pgstat_start();/* workers may be scheduled to start now */maybe_start_bgworkers();//此时才是正式的可接受连接状态/* at this point we are really open for business */ereport(LOG,(errmsg("database system is ready to accept connections")));/* Report status */AddToDataDirLockFile(LOCK_FILE_LINE_PM_STATUS, PM_STATUS_READY);
#ifdef USE_SYSTEMDsd_notify(0, "READY=1");
#endifcontinue;}
“database system is ready to accept connections”信息就在这里了
checkpointer、bgwrite、walwrite、autovacuum、arch(如有)、stat这些进程都需要启动,在这个阶段拉起这些进程不是必须返回成功的,后续也可以在ServerLoop
或者再次执行reaper
时尝试启动,只有startup进程是必须一次性必须启动并完成所有相关任务的:
if (pid < 0){/* in parent, fork failed */int save_errno = errno;errno = save_errno;switch (type){case StartupProcess:ereport(LOG,(errmsg("could not fork startup process: %m")));break;case BgWriterProcess:ereport(LOG,(errmsg("could not fork background writer process: %m")));break;case CheckpointerProcess:ereport(LOG,(errmsg("could not fork checkpointer process: %m")));break;case WalWriterProcess:ereport(LOG,(errmsg("could not fork WAL writer process: %m")));break;case WalReceiverProcess:ereport(LOG,(errmsg("could not fork WAL receiver process: %m")));break;default:ereport(LOG,(errmsg("could not fork process: %m")));break;}/** fork failure is fatal during startup, but there's no need to choke* immediately if starting other child types fails.*/if (type == StartupProcess)ExitPostmaster(1);return 0;}
spill文件生成逻辑各版本差异
spill在各个版本都是spill最大的事务,这里重点关注啥时候spill的逻辑
PG12:pg12的changes是4096条写死
static const Size max_changes_in_memory = 4096;
/** Check whether the transaction tx should spill its data to disk.*/
static void
ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{/** TODO: improve accounting so we cheaply can take subtransactions into* account here.*/if (txn->nentries_mem >= max_changes_in_memory){ReorderBufferSerializeTXN(rb, txn);Assert(txn->nentries_mem == 0);}
}
PG13:超过logical_decoding_work_mem
内存大小就spill
static void
ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
{
...while (rb->size >= logical_decoding_work_mem * 1024L){/** Pick the largest transaction (or subtransaction) and evict it from* memory by serializing it to disk.*/txn = ReorderBufferLargestTXN(rb);ReorderBufferSerializeTXN(rb, txn);
...
}
PG14:多个了一个流式传输ReorderBufferStreamTXN
static void
ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
{
...while (rb->size >= logical_decoding_work_mem * 1024L){/** Pick the largest transaction (or subtransaction) and evict it from* memory by streaming, if possible. Otherwise, spill to disk.*/if (ReorderBufferCanStartStreaming(rb) &&(txn = ReorderBufferLargestTopTXN(rb)) != NULL){...ReorderBufferStreamTXN(rb, txn);}else{...ReorderBufferSerializeTXN(rb, txn);}
...
}
14虽然有了流式复制,但是触发是要一定条件的:
/* Returns true, if the streaming can be started now, false, otherwise. */
static inline bool
ReorderBufferCanStartStreaming(ReorderBuffer *rb)
{LogicalDecodingContext *ctx = rb->private_data;SnapBuild *builder = ctx->snapshot_builder;/* We can't start streaming unless a consistent state is reached. */if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)return false;/** We can't start streaming immediately even if the streaming is enabled* because we previously decoded this transaction and now just are* restarting.*/if (ReorderBufferCanStream(rb) &&!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))return true;return false;
}
/** Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that* were running at that point finished. Till we reach that we hold off* calling any commit callbacks.*/SNAPBUILD_CONSISTENT = 2
额外的steam触发条件:
-
条件1:快照中的事务涵盖的所有事务都已完成(应该指commit or rollback)
-
条件2:context是私有数据(是不是说两条链路一张表就不会触发steam?)
-
条件3:快照中的事务是不可忽略的事务(可能指特殊的事务可以忽略,就不做了)
PG15:跟14差不多,只是函数更清晰,套娃少一些了
PG16:差不多
PG17:差不多,新增一个DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
可以强制stream
记忆点:
- PG12及以前是写死的4096条changes
- PG13新增
logical_decoding_work_mem
参数,可调整内存大小以减少spill概率 - PG14及以后支持流式复制Streaming
- 触发流式复制也需要一定的条件,所以即使有流式复制也可能会发生spill
- PG17新增
debug_logical_replication_streaming
参数以强制触发流式传输
spill文件清理逻辑
起库时清理spill其实只是一种场景,还有启动walsender清理和drop slot清理。
walsender启动时清理
ReorderBufferCleanupSerializedTXNs
会在数据库启动(walsender还没有启动)、walsender启动(数据库运行中)时被调用,注意这两部分场景是不一样的,只是他们会调用同一个函数。从函数注释部分也可以看出,该函数是为了“删除残留的序列化的reorder buffers”,即清理spill文件。
/** Remove any leftover serialized reorder buffers from a slot directory after a* prior crash or decoding session exit.*/
static void
ReorderBufferCleanupSerializedTXNs(const char *slotname)
{DIR *spill_dir;struct dirent *spill_de;struct stat statbuf;char path[MAXPGPATH * 2 + 12];sprintf(path, "pg_replslot/%s", slotname);/* we're only handling directories here, skip if it's not ours */if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))return;spill_dir = AllocateDir(path);while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL){/* only look at names that can be ours *///只对比前3个字符if (strncmp(spill_de->d_name, "xid", 3) == 0){snprintf(path, sizeof(path),"pg_replslot/%s/%s", slotname,spill_de->d_name);if (unlink(path) != 0)ereport(ERROR,(errcode_for_file_access(),mkdir errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",path, slotname)));}}FreeDir(spill_dir);
}
以上清理逻辑需要注意两点:
- 清理文件名以“xid”开头的文件。很明显state文件是不能清理的
- unlink清理,一次清理一个文件。考虑这一点可以帮助我们构建加速起库方案
数据库启动时清理
数据库启动时会fork一个startup进程来清理slot,清理函数跟walsender调用的清理函数一致:ReorderBufferCleanupSerializedTXNs
。
还有一个区别在于,walsender重启后,只会清理当前同名slot spill;而数据库启动时会顺序清理所有slot spill。
数据库启动startup进程,while顺序清理逻辑:
void
StartupReorderBuffer(void)
{DIR *logical_dir;struct dirent *logical_de;logical_dir = AllocateDir("pg_replslot");while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL){ //排除.和..if (strcmp(logical_de->d_name, ".") == 0 ||strcmp(logical_de->d_name, "..") == 0)continue;//验证slotname是否规范/* if it cannot be a slot, skip the directory */if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))continue;/** ok, has to be a surviving logical slot, iterate and delete* everything starting with xid-**/ReorderBufferCleanupSerializedTXNs(logical_de->d_name);}FreeDir(logical_dir);
}
while循环调用ReorderBufferCleanupSerializedTXNs
,后面跟walsender启动清理逻辑就一样了。
pg_drop_replication_slot手动清理
drop slot清理逻辑跟自动清理spill文件的逻辑不一样,它没有调用到ReorderBufferCleanupSerializedTXNs
。
drop slot流程如下:
pg_drop_replication_slot(PG_FUNCTION_ARGS)
->ReplicationSlotDrop(const char *name, bool nowait)
->ReplicationSlotDropAcquired(void)
->ReplicationSlotDropPtr
ReplicationSlotDropPtr
清理复制槽的逻辑也很有意思:
/** Permanently drop the replication slot which will be released by the point* this function returns.*/
static void
ReplicationSlotDropPtr(ReplicationSlot *slot)
{char path[MAXPGPATH];char tmppath[MAXPGPATH];/** If some other backend ran this code concurrently with us, we might try* to delete a slot with a certain name while someone else was trying to* create a slot with the same name.*/LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);/* Generate pathnames. */sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));/** Rename the slot directory on disk, so that we'll no longer recognize* this as a valid slot. Note that if this fails, we've got to mark the* slot inactive before bailing out. If we're dropping an ephemeral or a* temporary slot, we better never fail hard as the caller won't expect* the slot to survive and this might get called during error handling.*/if (rename(path, tmppath) == 0) //rename文件{/** We need to fsync() the directory we just renamed and its parent to* make sure that our changes are on disk in a crash-safe fashion. If* fsync() fails, we can't be sure whether the changes are on disk or* not. For now, we handle that by panicking;* StartupReplicationSlots() will try to straighten it out after* restart.*///fsync持久化START_CRIT_SECTION();fsync_fname(tmppath, true);fsync_fname("pg_replslot", true);END_CRIT_SECTION();}
.../** If removing the directory fails, the worst thing that will happen is* that the user won't be able to create a new slot with the same name* until the next server restart. We warn about it, but that's all.*/if (!rmtree(tmppath, true))ereport(WARNING,(errmsg("could not remove directory \"%s\"", tmppath)));/** We release this at the very end, so that nobody starts trying to create* a slot while we're still cleaning up the detritus of the old one.*/LWLockRelease(ReplicationSlotAllocationLock);
}
drop slot不是直接去复制槽目录下面去unlink,而是先把复制槽目录slotname/
rename成 slotname.tmp/
,然后再去做unlink目录下的文件,最后再删除 slotname.tmp/
目录本身。
其中rmtree也是在循环unlink文件。
复制槽溢出发生后加速起库方案测试
1000w个spill删除起来肯定是很慢的,直接mv目录的话就非常快。但是直接mv需要注意mv后的名称和state文件,以及需要知道mv到底跳过了哪一个源码步骤。
mv的名称注意事项
由于是异常停库,startup进程会执行SyncDataDirectory
fsync和stat所有data文件,这一点是比较难绕过的。SyncDataDirectory
做完以后,才开始处理复制槽。处理复制槽时会调用StartupReorderBuffer()
->ReorderBufferCleanupSerializedTXNs
全量清理spill文件。
在进入清理前,会调用ReplicationSlotValidateName
校验复制槽名称的有效性,我们可以在ReplicationSlotValidateName
上做文章,以骗过startup进程跳过ReorderBufferCleanupSerializedTXNs
的过程。
ReplicationSlotValidateName
规则:
bool
ReplicationSlotValidateName(const char *name, int elevel)
{
...for (cp = name; *cp; cp++){ //关键规则在这里if (!((*cp >= 'a' && *cp <= 'z')|| (*cp >= '0' && *cp <= '9')|| (*cp == '_'))){ereport(elevel,(errcode(ERRCODE_INVALID_NAME),errmsg("replication slot name \"%s\" contains invalid character",name),errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));return false;}}return true;
}
有效slot name只包含a-z
;0-9
;_
。
所以rename时建议加个点.
,
-
建议
slotname.bak
,slotname.20241215
等。 -
不建议
slotnamebackup
,slotname20241215
,slotname_bak
等等 -
不建议
.tmp
后缀,slotname有.tmp
后缀有特殊含义
最后rename后,要创建目录和拷贝state,不然启动的slot会表现的很反常(比如重复的slotname、自动生产一个slotname、删不到slot、下游起不来链路等等)。
汇总推荐mv操作如下:
cd pg_replslot
mv slotname slotname.bak
mkdir slotname
cp slotname.bak/state slotname/
起库时间对比
对比不同源码流程起库速度,看看手工mv/rm加速起库到底有没有意义。
参考源码逻辑原理:
- 正常停库,走fsync和stat
- 异常停库,走fsync和stat;
- 有效mv,将slotname目录命名为
.bak
,不走unlink - 无效mv,将slotname目录命名为
_bak
且spill文件命名为xid开头,走unlink
由于正常spill文件实在太慢,这里手工伪造slot目录和spill文件,总共50个slot,每个slot 40w个spill,总共2000w个spill来测试起库时间(用cp目录的方式要比cp文件、dd文件快很多)。
编号 | 测试方案 | 起库时间 |
---|---|---|
1 | 正常停库;起库不做fsync和stat,不做unlink | 0.1秒 |
2 | 正常停库,无效mv;起库不做fsync和stat,做unlink | 11分41秒 |
3 | 异常停库,有效mv;起库做fsync和stat,不做unlink | 4分35秒 |
4 | 异常停库,无效mv;起库做fsync和stat,做unlink | 32分2秒 |
5 | 异常停库,rm(创建slot目录并保留state) | 13分04秒 |
对比方案3、5,理论上当时的场景我们有效mv可以做到4分钟左右起库,rm的话13分钟左右。(这是一个粗糙的对比,恢复环境已经观察到有些东西不一样了)