分布式数据库的进度管理:TiDB 备份恢复工具 PiTR 的原理与实践

导读

对于一款企业级数据库产品而言,数据的安全性和可恢复性是至关重要的。PiTR(Point in Time Restore)作为 TiDB 备份工具的核心功能之一,提供了一种精细的数据恢复能力,允许用户将数据库集群恢复到过去的任意时间点。这种能力对于处理数据损坏、误操作或数据丢失等灾难性事件至关重要。

对于分布式系统而言,想实现精确的进度管理是十分复杂的,本文将深入解析 PiTR 在 TiDB 的分布式架构中的实现,包括其在 TiKV 层的备份流程,以及 TiDB 如何管理这些备份任务的进度。

希望本文能够帮助开发者和数据库管理员更好地理解 PiTR 的工作机制,有效地利用这一功能加固数据库基础设施。

PiTR 是 TiDB 备份工具中必不可少的一部分。如果说全量备份帮助我们获得了将集群回退到某个时间节点的能力,那么 PiTR 则更加精细地备份了集群的每一次写入,并且允许我们回到备份开始后的任意一个时点。

直觉上,当你启动一个 PiTR 任务,等于告诉集群:我需要知道从当前时间节点之后的全部变化。对于一个分布式数据库而言,这并不是一个简单的工作。

目前 TiDB 的数据存储结构

上图展示了目前 TiDB 的数据存储结构。用户以表和行的形式写入数据,每一行数据都会以一个键值对的形式存储在 TiKV 中,每一个 TiKV 又会被逻辑地划分为多个 region。

由于 TiDB 分布式写入的实质,各个 Region 的数据分布在不同宿主机上,也不存在一个确切统一的写入时点。所以我们需要找到一种方法分别管理每个 region 的写入工作,并且需要提供一个整体进度。在接下来的内容中,我们将详细展开 TiDB 的 PiTR 进度管理流程。从单个 TiKV 开始,逐步推进到整个集群。

TiKV 侧备份流程

如果我们希望管理备份工作的具体进度,首先需要了解的是,备份工作究竟是怎样完成的。在 TiDB 的实践中,PiTR 是一个分布式过程,每个 TiKV Server 自行记录备份数据,并将数据发送到远端储存,大致上按照下图所示的流程工作。

TiKV 侧备份流程

在 TiKV server 初始化期间,会同时(先后)初始化 BackupStreamObserver[1] 和 Endpoint[2] 两个组件。它们共用了同一个 scheduler(backup_stream_scheduler[3],通过向 scheduler 发送 Task 的方式进行互相沟通。

BackupStreamObserver 会实时监听 Raft 状态机的写入情况。其重点在于 on_flush_applied_cmd_batch()[4] 接口。这个接口会在 Raft 状态机 apply 时被调用,将 Raft 命令打包为 BatchEvent,然后作为一个任务发送给 scheduler。对于 PiTR 而言,这个任务被称为 Task::BatchEvent[5]。

pub struct CmdBatch {pub level: ObserveLevel,pub cdc_id: ObserveId,pub rts_id: ObserveId,pub pitr_id: ObserveId,pub region_id: u64,pub cmds: Vec<Cmd>,
}

可以看出,BatchEvent 的实质是一系列 Raft 命令的拷贝。PiTR 在备份时记录这些命令,并在恢复时重放,以实现日志备份功能。

而 Endpoint 负责沟通 TiKV Server 和外部储存。它会在启动之后进入一个循环,检查当前 scheduler 中是否包含新的任务,匹配并执行不同的函数。其中,我们需要关注的是 Task::BatchEvent,也就是从 Observer 发送来的写入数据。当 endpoint 匹配到 Task::BatchEvent,它会执行 backup_batch()[6] 函数开始备份这些键值对。

在这一步,Endpoint 先对 CmdBatch 进行简单检查,然后将它发往router.on_events()[7],并开始异步地等待结果。

Router 的作用是将写入操作按照 range 拆分,以提高并发度。每个 range 的写入并不是即时的,我们会在内存中储存一个临时文件,用于暂时存储从 raft store 更新的信息。当内存中储存的临时文件大小超出上限,或者超过指定刷盘间隔,我们才会真正将储存在临时文件中的数据写入远端储存,并视为完成了一次(部份)备份。目前 BackupStreamConfig 的默认设置中,max_flush_interval 为 3 分钟。

impl Default for BackupStreamConfig {fn default() -> Self {// ...Self {min_ts_interval: ReadableDuration::secs(10),max_flush_interval: ReadableDuration::minutes(3),// ...}}
}

当满足刷盘条件后,我们会跳转到 endpoint.do_flush() [8] 函数。并在这里完成将备份文件刷盘的逻辑。当这个函数完成之后,备份数据已经被写入远端存储,可以认为备份到此告一段落。此处正是汇报备份进度的最佳时刻。在并不令人注意的角落,这个任务是由一个回调完成的:flush_ob.after() [9]。

       async fn after(&mut self, task: &str, _rts: u64) -> Result<()> {let flush_task = Task::RegionCheckpointsOp(RegionCheckpointOperation::FlushWith(std::mem::take(&mut self.checkpoints),)); //Update checkpointtry_send!(self.sched, flush_task);let global_checkpoint = self.get_checkpoint(task).await?;info!("getting global checkpoint from cache for updating."; "checkpoint" => ?global_checkpoint);self.baseline.after(task, global_checkpoint.ts.into_inner()) //update safepoint.await?;Ok(())}

这个回调函数做了两件事,更新 service safe point 和 store checkpoint。它们是什么,又有什么用呢?

从检查点(Checkpoint)到全局检查点(Global Checkpoint)

上文中我们阅读了 PiTR 备份流程的细节。现在,我们可以回到正题,反思整个流程。

首先我们已经明确,对于 TiDB 这样的分布式数据库,所有的数据都储存在一个个单独的 TiKV 节点上。在 PiTR 流程中,这些 TiKV 也是各自将数据打包成文件,发送到远端储存上。这引出了一个重要的问题:如何进行进度管理?

为了确保备份进度的有效管理,我们需要跟踪每个 TiKV 节点上的数据备份进度。对于单个 Region,可以通过记录已备份数据的时间戳来实现进度管理:当数据被刷盘时,记录当前时间戳,这个时间戳就是该 Region 完成备份的最小时间节点,即 Checkpoint。

同时,我们需要了解到,需要备份的数据并不会永恒的保留。由于 MVCC 机制,每次数据修改都会产生一个新版本并保留旧版本,旧版本可以用于历史查询和事务隔离。随着时间的推移,这些历史数据会不断累积,因此需要通过 GC 机制来回收和清理旧版本,释放存储空间并提高性能。

我们需要确保在备份(Flush)完成之前,备份数据不会被 GC 清除。所以此处引入一个指标,通知 GC 可以安全清除的数据时间戳。这就是Service Safepoint。

值得注意的是,以上的讨论只是单个 region 的进度管理,一个集群中会同时存在多个 region,所以我们需要设计一个指标便于管理整个集群的备份进度,它被称之为Global Checkpoint。

在实践中,Global Checkpoint 是所有 TiKV Checkpoint 的最小值[10],这保证了所有 region 的进度都至少不小于这个时间节点。或者说,在这个时间节点之前,整个集群的数据都完成备份了。

而这个汇总所有 TiKV 进度并计算 Global Checkpoint 的工作,是在 TiDB 完成的。

TiDB 侧进度管理

TiDB 侧进度管理

既然我们了解了 TiKV 侧的备份进度管理流程。让我们转头看看 TiDB 的情况。

在 TiDB 侧,负责这项工作的组件被称为 CheckpointAdvancer [11]。它的本质是一个外挂在 TiDB 主程序上的守护进程,会随着时间执行一些周期性操作。它的工作主要包括两部分:

  1. 订阅更新来自 TiKV 的 FlushTSO 更新。
  2. 处理可能的错误并计算 Global Checkpoint。
  3. 计算总体更新进度并汇报给 PD。

具体地,在 CheckPointAdvancer 中有一个名为 FlushSubscriber[12] 的字段,TiDB 就是通过它监听 TiKV 的刷盘操作和 checkpoint 推进。FlushSubscriber 维持一个 gRPC 流,持续监听[13] 不同 range 的 checkpoint 并将其记录下来。随后通过 channel 发送给 advancer。

而 advacner 接收到这些 checkpoint 之后,会将它们放置于 checkpoints[14] 字段中。当接收到来自 TiKV 的进度信息之后,advancer 会尝试开始更新 Global Checkpoint。作为一个守护进程,更新过程并不是实时的,而是随着主进程调用它的 tick()[15] 方法间歇性完成。

func (c *CheckpointAdvancer) tick(ctx context.Context) error {//...        var errs errorcx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())defer cancel()err := c.optionalTick(cx)if err != nil {// ...  }err = c.importantTick(ctx)if err != nil {// ...}return errs
}

这个过程实际上被分为了两个部分,optionalTick()[16] 和 importantTick() [17]。

optionalTick 主要负责与 FlushSubscriber 沟通,获取来自 TiKV 的进度更新。由于单个 TiKV 的 Checkpoint 并不一定会推进,所以取名为 optionalTick。一旦捕获到 TiKV FlushTSO 的更新,便会在这里记录并试图推进全局检查点。

而 importantTick 则负责管理全局进度。确认进度更新后,这里会产生新的 Global Checkpoint 和 Service Safepoint[18]。

这个行为是存在风险的。如果某个 TiKV 的 Checkpoint 因为种种原因一直没有成功推进,就会阻塞住 Global Checkpoint 的推进,进而可能阻塞住 GC,无法正确清除已经完成备份的冗余数据。在最糟糕的情况下,某个 TiKV 陷入了不可自动恢复的错误。它有可能会永远阻碍 GC 进度,造成对整体系统的更大破坏。

因此,importantTick 会检查[19] checkpoint 距离上次更新的时间差。如果某个 Checkpoint 长时间没有推进,这个备份任务会被标记为异常状态[20]。随后,advancer 会自动暂停这个任务,等待管理员手工运维的介入。

        isLagged, err := c.isCheckpointLagged(ctx)if err != nil {return errors.Annotate(err, "failed to check timestamp")}if isLagged {err := c.env.PauseTask(ctx, c.task.Name)if err != nil {return errors.Annotate(err, "failed to pause task")}return errors.Annotate(errors.Errorf("check point lagged too large"), "check point lagged too large")}

此后,advancer 并不会停止,它只是跳过 [21] 了异常任务的 checkpoint 更新。如果 PD 恢复了这个任务,会向 advancer 发送信号[22],advancer 便可以回到正常的 tick 流程中。

此处介绍的异常处理机制是完全防卫性质的。它只能识别异常状态的存在,却无法指出问题的原因,最终还需要管理员手动介入。或许在未来,我们能够实现 PiTR 的自动运维,当 checkpoint 恢复推进之后,可以自动重启这个任务。

参考资料

[1]

BackupStreamObserver: https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/observer.rs#L94

[2]

Endpoint: https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/endpoint.rs#L1414

[3]

backup_stream_scheduler: https://github.com/tikv/tikv/blob/release-8.0/components/server/src/server.rs#L891

[4]

on_flush_applied_cmd_batch(): https://github.com/tikv/tikv/blob/release-8.0/components/raftstore/src/coprocessor/mod.rs#L581

[5]

Task::BatchEvent: https://github.com/tikv/tikv/blob/release-8.0/components/raftstore/src/coprocessor/mod.rs#L504

[6]

backup_batch(): https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/endpoint.rs#L479

[7]

router.on_events(): https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/router.rs#L595

[8]

endpoint.do_flush(): https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/endpoint.rs#L825

[9]

flush_ob.after(): https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/checkpoint_manager.rs#L526

[10]

最小值: https://github.com/pingcap/tidb/blob/release-8.0/br/pkg/streamhelper/advancer.go#L295

[11]

CheckpointAdvancer: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L57

[12]

FlushSubscriber: https://github.com/pingcap/tidb/blob/release-8.0/br/pkg/streamhelper/flush_subscriber.go#L27

[13]

持续监听: https://github.com/pingcap/tidb/blob/release-8.0/br/pkg/streamhelper/flush_subscriber.go#L250

[14]

checkpoints: https://github.com/pingcap/tidb/blob/release-8.0/br/pkg/streamhelper/advancer.go#L56

[15]

tick(): https://github.com/pingcap/tidb/blob/release-8.0/br/pkg/streamhelper/advancer.go#L645

[16]

optionalTick(): https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L622

[17]

importantTick(): https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L587

[18]

Service Safepoint: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L605

[19]

检查: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L594

[20]

标记为异常状态: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L598

[21]

跳过: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L646

[22]

发送信号: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L465

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/56081.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言 | 第十六章 | 共用体 家庭收支软件-1

P 151 结构体定义三种形式 2023/3/15 一、创建结构体和结构体变量 方式1-先定义结构体&#xff0c;然后再创建结构体变量。 struct Stu{ char *name; //姓名 int num; //学号 int age; //年龄 char group; //所在学习小组 float score; //成绩 }; struct Stu stu1, stu2; //…

基于SpringBoot+Vue+Uniapp的植物园管理小程序系统(2024最新,源码+文档+远程部署+讲解视频等)

3. 论文参考 4. 项目运行截图 5. 技术框架 5.1 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不是配置文件。Spring …

Spring Boot在知识管理中的应用

1系统概述 1.1 研究背景 如今互联网高速发展&#xff0c;网络遍布全球&#xff0c;通过互联网发布的消息能快而方便的传播到世界每个角落&#xff0c;并且互联网上能传播的信息也很广&#xff0c;比如文字、图片、声音、视频等。从而&#xff0c;这种种好处使得互联网成了信息传…

数据检测和响应:DDR 用于数据安全

数据检测和响应 (DDR) 用于主动数据安全态势管理 企业必须保护其数据免受网络攻击&#xff0c;主要有三个原因&#xff1a; 1. 公司有法律义务保证客户信息的安全&#xff1b; 2. 不这样做会损害公司的声誉&#xff1b; 3. 补救数据泄露的影响可能代价高昂&#xff0c;而且…

数据结构前置知识(上)

1. 初识集合框架 1.1 什么是集合框架 在了解集合框架之前,我们先来认识一下数据结构,所谓数据结构就是描述和组织数据的一个东西. 那什么是集合框架呢?在java里面集合框架(Java Collection Framework),又被称为容器container,说白了就是很多个接口,抽象类,实现类组成的一个包,…

架构设计笔记-12-信息系统架构设计理论与实践

目录 知识要点 案例分析 1.Java企业级应用系统 2.c/s架构&#xff0c;b/s架构 知识要点 软件架构风格是描述某一特定应用领域中系统组织方式的惯用模式。架构风格定义了一类架构所共有的特征&#xff0c;主要包括架构定义、架构词汇表和架构约束。 数据挖掘是从数据库的大…

OceanBase 4.x 部署实践:如何从单机扩展至分布式部署

OceanBase 4.x 版本支持2种部署模式&#xff1a;单机部署与分布式部署&#xff0c;同时支持从单机平滑扩展至分布式架构。这样&#xff0c;可以有效解决小型业务向大型业务转型时面临的扩展难题&#xff0c;降低了机器资源的成本。 以下将详述如何通过命令行&#xff0c;实现集…

解决IPv6网络引起的网页与程序卡顿问题-本地DNS解析方案

一、问题环境 连接IPv6WiFi网络时&#xff0c;易出现网页打不开&#xff0c;程序开启无法加载画面。系统环境为Win10。 二、解决思路 在用户端无法触及路由器、网关等管理资源时&#xff0c;只能从本地环境中更改配置。多适用于公共WiFi&#xff0c;私人WiFi可直接从路由器DNS、…

手撕数据结构 —— 队列(C语言讲解)

目录 1.什么是队列 2.如何实现队列 3.队列的实现 Queue.h中接口总览 具体实现 结构的定义 初始化 销毁 入队列 出队列 取队头元素 取队尾元素 判断是否为空 获取队列的大小 4.完整代码附录 Queue.h Queue.c 1.什么是队列 队列是一种特殊的线性表&#xff0…

【uni-app】HBuilderX安装uni-ui组件

目录 1、官网找到入口 2、登录帐号 3、打开HuilderX 4、选择要应用的项目 5、查看是否安装完成 6、按需安装 7、安装完毕要重启 8、应用 前言&#xff1a;uniapp项目使用uni-ui组件方式很多&#xff0c;有npm安装等&#xff0c;或直接创建uni-ui项目&#xff0c;使用un…

【Oracle数据库进阶】001.SQL基础查询_查询语句

课 程 推 荐我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448;入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448;虚 拟 环 境 搭 建 &#xff1a;&#x1…

Aria2Cloudreve任意文件写入到RCE

什么是Aria2 Aria2 是一个轻量级的命令行下载工具&#xff0c;支持多种下载协议&#xff0c;如 HTTP、FTP、SFTP、BitTorrent 和 Metalink。它以其强大的多源下载能力而著称&#xff0c;可以同时从多个服务器或对等节点下载文件&#xff0c;加快下载速度。Aria2 占用资源少&am…

【C++】拆分详解 - vector

文章目录 一、vector的介绍二、vector的使用1. 构造2. 迭代器3. vector 空间增长问题4. 增删查改5. vector 迭代器失效问题5.1 底层空间改变&#xff08;扩容、缩容&#xff09;5.2 指定位置元素的删除操作5.3 Linux与VS平台差异 三、vector 模拟实现0. 整体框架1. 构造 / 析构…

图解 微信开发者工具 小程序源码 调试、断点标记方法 , 微信小程序调试器,真机调试断点调试方法,小程序网络API请求调试方法 总结

在我们使用微信开发者工具进行微信小程序开发的时候&#xff0c;在这个微信开发者工具的代码编辑框里面我们是无法像使用vscode, idea等IDE工具时那样直接对代码打断点进行调试&#xff0c; 原因是小程序实际上他就是一个web浏览器应用的包装, 在其内部使用的还是类似chrome的…

塔吊识别数据集 yolo格式 共5076张图片 已划分好训练验证 txt格式 yolo可直接使用

塔吊识别数据集 yolo格式 共5076张图片 已划分好训练验证 txt格式 yolo可直接使用。 类别&#xff1a;塔吊(Tower-crane) 一种 训练数据已划分&#xff0c;配置文件稍做路径改动即可训练。 训练集&#xff1a; 4724 &#xff08;正面3224 负面1500&#xff09; 验证集&#xf…

C#实现Punycode编码/解码

测试代码 string word "我爱你"; string idn "我爱你.中国"; string wordCode PunyCode.Encode(word); string punycode PunyCode.IDN2Punycode(idn);Console.WriteLine(word); Console.WriteLine(wordCode); Console.WriteLine(PunyCode.Decode(word…

外卖点餐系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;外卖员管理&#xff0c;餐厅管理&#xff0c;用户管理&#xff0c;菜品分类管理&#xff0c;菜品信息管理&#xff0c;外卖订单管理&#xff0c;订单配送管理 微信端账号功能包括&#xff1a;系统首页…

OKHTTP 如何处理请求超时和重连机制

&#x1f604;作者简介&#xff1a; 小曾同学.com,一个致力于测试开发的博主⛽️&#xff0c;主要职责&#xff1a;测试开发、CI/CD 如果文章知识点有错误的地方&#xff0c;还请大家指正&#xff0c;让我们一起学习&#xff0c;一起进步。 &#x1f60a; 座右铭&#xff1a;不…

Linux下Docker方式Jenkins安装和配置

一、下载&安装 Jenkins官方Docker仓库地址&#xff1a;https://hub.docker.com/r/jenkins/jenkins 从官网上可以看到&#xff0c;当前最新的稳定版本是 jenkins/jenkins:lts-jdk17。建议下在新的&#xff0c;后面依赖下不来 所以&#xff0c;我们这里&#xff0c;执行doc…

VS+QT 自定义插件变成动态库加载及使用

一、前言 有个界面需要重复使用某个自定义的控件&#xff0c;希望自定义控件能够像动态库文件那样&#xff0c;添加引用lib就能使用&#xff0c;经过多次太坑后&#xff0c;总结如下 二、实现方式 ① 新建项目&#xff0c;选择"Qt Designer Custom Widget" 创建自定…