MongoDB 4.2 内核解析 - Change Stream

MongoDB 从3.6版本开始支持了 Change Stream 能力(4.0、4.2 版本在能力上做了很多增强),用于订阅 MongoDB 内部的修改操作,change stream 可用于 MongoDB 之间的增量数据迁移、同步,也可以将 MongoDB 的增量订阅应用到其他的关联系统;比如电商场景里,MongoDB 里存储新的订单信息,业务需要根据新增的订单信息去通知库存管理系统发货。

Change Stream 与 Tailing Oplog 对比

在 change stream 功能之前,如果要获取 MongoDB 增量的修改,可以通过不断 tailing oplog  的方式来 拉取增量的 oplog ,然后针对拉取到的 oplog 集合,来过滤满足条件的 oplog。这种方式也能满足绝大部分场景的需求,但存在如下的不足。

  1. 使用门槛较高,用户需要针对 oplog 集合,打开特殊选项的的 tailable cursor  ("tailable": true, "awaitData" : true)。
  2. 用户需要自己管理增量续传,当拉取应用 crash 时,用户需要记录上一条拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再继续拉取。
  3. 结果过滤必须在拉取侧完成,但只需要订阅部分 oplog 时,比如针对某个 DB、某个 Collection、或某种类型的操作,必须要把左右的 oplog 拉取到再进行过滤。
  4. 对于 update 操作,oplog 只包含操作的部分内容,比如 {$set: {x: 1}} ,而应用经常需要获取到完整的文档内容。
  5. 不支持 Sharded Cluster 的订阅,用户必须针对每个 shard 进行 tailing oplog,并且这个过程中不能有 moveChunk 操作,否则结果可能乱序。

MongoDB Change Stream 解决了 Tailing oplog 存在的不足

  1. 简单易用,提供统一的 Change Stream API,一次 API 调用,即可从 MongoDB Server 侧获取增量修改。
  2. 统一的进度管理,通过 resume token 来标识拉取位置,只需在 API 调用时,带上上次结果的 resume token,即可从上次的位置接着订阅。
  3. 支持对结果在 Server 端进行 pipeline 过滤,减少网络传输,支持针对 DB、Collection、OperationType 等维度进行结果过滤。
  4. 支持 fullDocument: "updateLookup" 选项,对于 update,返回当时对应文档的完整内容。
  5. 支持 Sharded Cluster 的修改订阅,相同的 API 请求发到 mongos ,即可获取集群维度全局有序的修改。

Change Stream 实战

以 Mongo shell 为例,使用 Change Stream 非常简单,mongo shell 封装了针对整个实例、DB、Collection 级别的订阅操作。

db.getMongo().watch()    订阅整个实例的修改
db.watch()               订阅指定DB的修改
db.collection.watch()    订阅指定Collection的修改
  1. 新建连接1发起订阅操作
mytest:PRIMARY>db.coll.watch([], {maxAwaitTimeMS: 60000})  最多阻塞等待 1分钟
  1. 新建连接2写入新数据

         

mytest:PRIMARY> db.coll.insert({x: 100})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 101})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 102})
WriteResult({ "nInserted" : 1 })
  1. 连接1上收到 Change Stream 更新
mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000})
{ "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } }
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }

 

  1. 上述 ChangeStream 结果里,_id 字段的内容即为 resume token,标识着 oplog 的某个位置,如果想从某个位置继续订阅,在 watch 时,通过 resumeAfter 指定即可。比如每个应用订阅了上述3条修改,但只有第一条已经成功消费了,下次订阅时指定第一条的 resume token 即可再次订阅到接下来的2条。

    

mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }})
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }

Change Stream 内部实现

watch() wrapper

db.watch() 实际上是一个 API wrapper,实际上 Change Stream 在 MongoDB 内部实际上是一个 aggregation 命令,只是加了一个特殊的 $changestream  阶段,在发起 change stream 订阅操作后,可通过 db.currentOp() 看到对应的 aggregation/getMore 操作的详细参数。

{"op" : "getmore","ns" : "test.coll","command" : {"getMore" : NumberLong("233479991942333714"),"collection" : "coll","maxTimeMS" : 50000,"lsid" : {"id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")},},"planSummary" : "COLLSCAN","cursor" : {"cursorId" : NumberLong("233479991942333714"),"createdDate" : ISODate("2019-12-31T06:35:52.479Z"),"lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"),"nDocsReturned" : NumberLong(1),"nBatchesReturned" : NumberLong(1),"noCursorTimeout" : false,"tailable" : true,"awaitData" : true,"originatingCommand" : {"aggregate" : "coll","pipeline" : [{"$changeStream" : {"fullDocument" : "default"}}],"cursor" : {},"lsid" : {"id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")},"$clusterTime" : {"clusterTime" : Timestamp(1577774144, 1),"signature" : {"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),"keyId" : NumberLong(0)}},"$db" : "test"},"operationUsingCursorId" : NumberLong(7019500)},"numYields" : 2,"locks" : {}}

resume token

resume token 用来描述一个订阅点,本质上是 oplog 信息的一个封装,包含 clusterTime、uuid、documentKey等信息,当订阅 API 带上 resume token 时,MongoDB Server 会将 token 转换为对应的信息,并定位到 oplog 起点继续订阅操作。

struct ResumeTokenData {Timestamp clusterTime;int version = 0;size_t applyOpsIndex = 0;Value documentKey;boost::optional<UUID> uuid;
};

ResumeTokenData 结构里包含 version 信息,在 4.0.7 以前的版本,version 均为0; 4.0.7 引入了一种新的 resume token 格式,version 为 1; 另外在 3.6 版本里,Resume Token 的编码与 4.0 也有所不同;所以在版本升级后,有可能出现不同版本 token 无法识别的问题,所以尽量要让 MongoDB Server 所有组件(Replica Set 各个成员,ConfigServer、Mongos)都保持相同的内核版本。

updateLookup

Change Stream 支持针对 update 操作,获取当前的文档完整内容,而不是仅更新操作本身,比如

mytest:PRIMARY> db.coll.find({_id: 101})
{ "_id" : 101, "name" : "jack", "age" : 18 }
mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

上面的 update 操作,默认情况下,change stream 会收到  {_id: 101}, {$set: {age: 20}  的内容,而并不会包含这个文档其他未更新字段的信息;而加上 fullDocument: "updateLookup" 选项后,Change Stream 会根据文档 _id 去查找文档当前的内容并返回。

需要注意的是,updateLookup 选项只能保证最终一致性,比如针对上述文档,如果连续更新100次,update 的 change stream 并不会按顺序收到中间每一次的更新,因为每次都是去查找文档当前的内容,而当前的内容可能已经被后续的修改覆盖。

Sharded cluster

Change Stream 支持针对 sharded cluster 进行订阅,会保证全局有序的返回结果;为了达到全局有序这个目标,mongos 需要从每个 shard 都返回订阅结果按时间戳进行排序合并返回。

在极端情况下,如果某些 shard 写入量很少或者没有写入,change stream 的返回延时会受到影响,因为需要等到所有 shard 都返回订阅结果;默认情况下,mongod server 每10s会产生一条 Noop 的特殊oplog,这个机制会间接驱动 sharded cluster 在写入量不高的情况下也能持续运转下去。

由于需要全局排序,在 sharded cluster 写入量很高时,Change Stream 的性能很可能跟不上;如果对性能要求非常高,可以考虑关闭 Balancer,在每个 shard 上各自建立 Change Stream。


原文链接
本文为阿里云原创内容,未经允许不得转载。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/517019.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue项目启动后Error: Cannot find module ‘xxx’的解决方法

文章目录1. 删除2. 安装依赖3. 启动项目解决方法 1. 删除 删除 node_modules 整个文件夹和 package-lock.json 文件(这个文件不一定有)&#xff0c;注意不是 package.json 2. 安装依赖 在项目下运行&#xff1a;npm install# or yarn3. 启动项目 npm start # or yarn star…

看到抖音上Python工程师晒得工资条,我沉默了......

我是个抖音中毒者闲来无事就喜欢刷抖音最近刷到了一个Python工程师的工资条然后我默默的打开看了然后就默默的关闭了如今Python技术由于大数据、人工智能的兴起Python也越来越火大家都纷纷学Python我不能跟你确保说学完Python你就能拿高工资但是你学完Python肯定有饭吃说不定还…

不用 H5,闲鱼 Flutter 如何玩转小游戏?

阿里妹导读&#xff1a;最近APP游戏化成为了一个新的风口&#xff0c;把在游戏中一些好玩的、能吸引用户的娱乐方式或场景应用在应用当中&#xff0c;以达到增加用户粘性&#xff0c;提升DAU的效果&#xff0c;成本较低。同时在一些需要对用户有引导性的场景&#xff0c;游戏化…

阿里巴巴向全社会开放黑科技:“泡在水里”的服务器

为了让数据中心更绿色&#xff0c;阿里工程曾将服务器“泡在水里”进行散热&#xff0c;节能超70%&#xff0c;今天这项黑科技的神秘面纱被揭开。 2020年1月6日&#xff0c;阿里巴巴宣布将“浸没式液冷数据中心技术规范”向全社会开放。这项规范旨在用一套标准流程为下一代绿色…

VS Code vue 模板

我们希望每次新建.vue文件后&#xff0c;VSCODE能够根据配置&#xff0c;自动生成我们想要的内容。 打开VSCODE编辑器&#xff0c;依次选择“文件 -> 首选项 -> 用户代码片段”&#xff0c;此时&#xff0c;会弹出一个搜索框&#xff0c;我们输入vue&#xff0c; 如下&am…

程序员必备基础:Git 命令全方位学习

来源 | 捡田螺的小男孩责编 | Carol封图 | CSDN 下载自视觉中国掌握Git命令是每位程序员必备的基础&#xff0c;之前一直是用smartGit工具&#xff0c;直到看到大佬们都是在用Git命令操作的&#xff0c;回想一下&#xff0c;发现有些Git命令我都忘记了&#xff0c;于是写了这篇…

eBay邓明:dubbo-go 中 metrics 的设计

最近因为要在 Apache/dubbo-go&#xff08;以下简称 dubbo-go &#xff09;里面实现类似的这个 metrics 功能&#xff0c;于是花了很多时间去了解现在 Dubbo 里面的 metrics 是怎么实现的。该部分&#xff0c;实际上是被放在一个独立的项目里面&#xff0c;即 metrics 。 总体…

稳定性专题 | Spring Boot 常见错误及解决方法

导读 『StabilityGuide』是阿里多位阿里技术工程师共同发起的稳定性领域的知识库开源项目&#xff0c;涵盖性能压测、故障演练、JVM、应用容器、服务框架、流量调度、监控、诊断等多个技术领域&#xff0c;以更结构化的方式来打造稳定性领域的知识库。 Spring Boot 作为 Java…

vue3 线上环境 ctx 无法识别

解决方案&#xff1a; Vue3获取当前组件实例的 getCurrentInstance 方法上 ctx 生产获取不到 上面的全局方法&#xff0c; getCurrentInstance代表上下文&#xff0c;即当前实例。ctx相当于Vue2的this, 但是需要特别注意的是ctx代替this只适用于开发阶段&#xff0c;如果将项目…

新职业风口已至!人社部宣布这10个职业缺口近千万!

负责阿里小蜜产品的陈海青是阿里最年轻的P9专家&#xff0c;才30岁出头&#xff0c;已经年入百万。小编的前领导&#xff0c;农村出身毫无背景&#xff0c;但是抓住风口卖掉了手里的比特币全款买了学区房&#xff0c;让他的孩子赢在起跑线上。面对这些踩在风口上的幸运儿&#…

一小时快速搭建基于阿里云容器服务-Kubernetes的Web应用

本文面向的读者 如果您是一个Kubernetes的初学者&#xff0c;本文可以帮助你快速在云上搭建一个可实际使用的集群环境&#xff0c;并发布自己的第一个应用。你无须提前准备任何的硬件资源或者下载任何的软件包。 如果您已经有一个自建的Kubernetes集群&#xff0c;想要尝试阿…

VS Code Git 日常操作

文章目录1. 初始化Git仓库2. 新建分支3. 提交4. 同步远程4.分支切换5. 合并分支1. 初始化Git仓库 使用**ctrl**召唤出命令窗口 # 初始化Git仓库 git init2. 新建分支 点击右下角的master&#xff0c;上方就会出现一个输入框&#xff0c;点击正在创建新分支&#xff0c;输入…

bootstrap table 列拖动变宽

需要导入 colResizable-1.6.min.js /**_ _____ _ _ _ | | __ \ (_) | | | | ___ ___ | | |__) |___ ___ _ ______ _| |__ | | ___ / __/ _ \| | _ // _ \/ __| |_ / _ | _ \| |/ _ \| (_| (_) | | | \ \ __/\__ \ |/…

阿里产品专家:高情商的技术人,如何做沟通?

不愿沟通是固执&#xff0c;不会沟通是傻瓜&#xff0c;不敢沟通是奴隶。 ——德拉蒙德 工作中&#xff0c;你是否经常看到别人在会上谈笑风生、纵横捭阖&#xff0c;但自己却唯唯诺诺&#xff0c;不敢表达观点&#xff1f;即便鼓起勇气发言却不被重视&#xff0c;经常被人打断…

“刚毕业1年,做Python能挣多少?”网友:吹的不多..

01现状揭秘&#xff1a;Python的火持续燃烧程序员&#xff1a;心态崩了&#xff01;2020年转眼已经大半&#xff0c;在近几个月的榜单中&#xff0c;Python已经连续走上卫冕的道路&#xff0c;并且与Java的差距拉得更远了一些。以往与Java常呈现你追我赶之势&#xff0c;而这一…

阿里云正式推出内容平台“云栖号”:全面助力企业和个人上云决策

1月7日&#xff0c;阿里云官网正式推出“云栖号”&#xff08;https://yqh.aliyun.com/ &#xff09;&#xff0c;旨在为大家提供第一手的上云资讯&#xff0c;云产品快速入门&#xff0c;来自不同行业精选的企业上云案例&#xff0c;基于众多成功案例萃取而成的最佳实践&#…

Shiro 实现免密登陆

需求&#xff1a;对接第三方登陆&#xff0c;实现绕过原有Shiro认证登陆。 文章目录一、实现思路1. 现状分析2. 用户来源3. 所属范围二、实现方案2.1. 自定义登录认证规则2.2. Shiro认证枚举2.3. 密码和非密码登录2.4. 规则配置2.5. 自定义Realm2.6. 案例使用一、实现思路 1. …

bootstrap table 搜索列formatter之后,单字节搜索异常

bootstrap table 搜索列formatter之后&#xff0c;单字节搜索异常 最近发现搜索这边出现这个问题&#xff0c;这样搜索没有效果 后面发现是因为搜索列formatter之后就会出现这个问题&#xff0c;那么我们就多生成一列不使用formatter并隐藏这列 <!DOCTYPE html> <h…

完了!TCP出了大事!

来源 | 编程技术宇宙责编 | 晋兆雨封图 | CSDN 下载自视觉中国不速之客夜黑风高&#xff0c;乌云蔽月。两位不速之客&#xff0c;身着黑衣&#xff0c;一高一矮&#xff0c;潜入Linux帝国。这一潜就是一个多月&#xff0c;直到他们收到了一条消息高个&#xff1a;“上峰终于给我…

基于Flutter+FaaS的业务框架思考与实践

闲鱼将使用Flutter和FaaS来建设未来的技术开发体系&#xff0c;这是一项长期的规划&#xff0c;新的技术在现在看来犹如雾里看花&#xff0c;需要我们不断的思考&#xff0c;探索&#xff0c;实践才能渐渐描绘出它的轮廓。本文对此提供一种思考角度&#xff0c;对未来基于FaaSF…