MongoDB之Change Stream实战

什么是 Chang Streams

Change Stream 指数据的变化事件流,MongoDB 从 3.6 版本开始提供订阅数据变更的功能。
Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:


Change Stream触发器
触发方式异步同步(事务保证)
触发位置应用回调事件数据库触发器
触发次数每个订阅事件的客户端1次(触发器)
故障恢复从上次断点重新触发事务回滚

Change Stream 的实现原理

Change Stream 是基于 oplog 实现的,提供推送实时增量的推送功能。它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。
被追踪的变更事件主要包括:

  • insert/update/delete:插入、更新、删除;
  • drop:集合被删除;
  • rename:集合被重命名;
  • dropDatabase:数据库被删除;
  • invalidate:drop/rename/dropDatabase 将导致 invalidate 被触发, 并关闭 change stream;

666.png
如果只对某些类型的变更事件感兴趣,可以使用使用聚合管道的过滤步骤过滤事件:

var cs = db.user.watch([{$match:{operationType:{$in:["insert","delete"]}}
}])

Change Stream会采用 "readConcern:majority"这样的一致性级别,保证写入的变更不会被回滚。
因此:

  • 未开启 majority readConcern 的集群无法使用 Change Stream;
  • 当集群无法满足 {w: “majority”} 时,不会触发 Change Stream(例如 PSA 架构 中的 S 因故障宕)。

MongoShell 测试
窗口 1:

db.user.watch([],{maxAwaitTimeMS:1000000}).pretty()

窗口 2:

db.user.insert({name:"xxxx"})

变更事件字段说明:

名称说明
_id变更事件的 Token 对象
operationType变更类型
fullDocument文档完整内容
ns监听的目标
ns.db变更的数据库
ns.coll变更的集合
to对于 rename 操作变更后的目标
ns.dbrename 操作后的数据库
documentKey变更文档的键值,含 _id 字段
updateDescription变更描述
updateDescription.updatedFields变更中更新的字段
updateDescription.removedFields变更中删除的字段
clusterTime对应oplog关联的时间戳
txnNumber事务编号,仅在多文档事务中出现,MongoDB4.0 版本支持
lsid事务关联的会话号,仅在多文档事务中出现,MongoDB4.0 版本支持

Change Stream 故障恢复

假设在一系列写入操作的过程中,订阅 Change Stream 的应用在接收到“写3”之后 于 t0 时刻崩溃,重启后后续的变更怎么办?
image.png
想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的 _id 即可。 Change Stream 回调所返回的的数据带有 _id,这个 _id 可以用于断点恢复。例如:

var cs = db.collection.watch([], {resumeAfter: <_id>})

即可从上一条通知中断处继续获取后续的变更通知。

使用场景

  • 监控

用户需要及时获取变更信息(例如账户相关的表),ChangeStreams 可以提供监控功能,一旦相关的表信息发生变更,就会将变更的消息实时推送出去。

  • 分析平台

例如需要基于增量去分析用户的一些行为,可以基于 ChangeStreams 把数据拉出来,推到下游的计算平台, 比如类似 Flink、Spark 等计算平台等等。

  • 数据同步

基于 ChangeStreams,用户可以搭建额外的 MongoDB 集群,这个集群是从原端的 MongoDB 拉取过来的, 那么这个集群可以做一个热备份,假如源端集群发生网络不通等等之类的变故,备集群就可以接管服务。 还可以做一个冷备份,如用户基于 ChangeStreams 把数据同步到文件,万一源端数据库发生不可服务, 就可以从文件里恢复出完整的 MongoDB 数据库, 继续提供服务。(当然,此处还需要借助定期全量备份来一同完成恢复) 另外数据同步它不仅仅局限于同一地域,可以跨地域,从北京到上海甚至从中国到美国等等。

  • 消息推送

假如用户想实时了解公交车的信息,那么公交车的位置每次变动,都实时推送变更的信息给想了解的用户,用户能够实时收到公交车变更的数据,非常便捷实用。

注意事项

  • Change Stream 依赖于 oplog,因此中断时间不可超过 oplog 回收的最大时间窗;
  • 在执行 update 操作时,如果只更新了部分数据,那么 Change Stream 通知的也是增量部分;
  • 删除数据时通知的仅是删除数据的 _id。

Spring Boot 整合Chang Stream

(1)引入依赖

<!--spring data mongodb--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>

(2)配置 yml

spring:data:mongodb:uri: mongodb://firechou:firechou@192.168.65.174:28017,192.168.65.174:28018,192.168.65.174:28019/test?authSource=admin&replicaSet=rs0

(3)配置 mongo 监听器的容器 MessageListenerContainer,spring 启动时会自动启动监听的任务用于接收 changestream

@Configuration
public class MongodbConfig {@BeanMessageListenerContainer messageListenerContainer(MongoTemplate template, DocumentMessageListener documentMessageListener) {Executor executor = Executors.newFixedThreadPool(5);MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, executor) {@Overridepublic boolean isAutoStartup() {return true;}};ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(documentMessageListener).collection("user") // 需要监听的集合名// 过滤需要监听的操作类型,可以根据需求指定过滤条件.filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert", "update","delete"))))// 不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息.fullDocumentLookup(FullDocument.UPDATE_LOOKUP).build();messageListenerContainer.register(request, Document.class);return messageListenerContainer;}
}

(4)配置 mongo 监听器,用于接收数据库的变更信息

@Component
public class DocumentMessageListener<S, T> implements MessageListener<S, T> {@Overridepublic void onMessage(Message<S, T> message) {System.out.println(String.format("Received Message in collection                                         %s.\n\trawsource: %s\n\tconverted: %s",message.getProperties().getCollectionName(), message.getRaw(), message.getBody()));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/614024.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux动态分配IP与正向解析DNS

目录 一、DHCP分配 1. 动态分配 1.1 服务端服务安装 1.2 修改服务端dhcp配置 1.3 修改客户端dhcp&#xff0c;重启查询网卡信息 2. 根据mac固定分配 2.1 修改服务器端dhcp服务配置 2.2 客户端自动获取&#xff0c;查看网卡信息 二、时间同步 1. 手动同步 2. 自动同…

java实现AES256对称加解密工具类

一、引入依赖包 引入相关依赖包 <dependency><groupId>org.bouncycastle</groupId><artifactId>bcprov-jdk15on</artifactId><version>1.70</version> </dependency> <!--lombok用于简化实体类开发--> <dependency&g…

Hive基础

hive的基础部分大致有四部分&#xff1a;Hive数据类型、Hive运算符、Hive数据存储、Hive表存储格式。这四部分是学习hive必须掌握的知识。 一、Hive数据类型 整体概述 1&#xff0c;hive的数据类型指的是表中列字段类型&#xff0c;类似于编程语言中对变量类型的定义如&#…

Beauty algorithm(七)瘦脸

瘦脸的实现采用局部平移法。 一、skills 前瞻 局部平移 二、目标区域定位 左脸: 关键点选择3、5点,基点30 rmax:计算两点5-3间的距离, |x-c|:图像任一点到固定基点c的距离 |m-c|:两固定点距离 右脸: 关键点选择

【MySQL】锁机制

文章目录 一、表级锁和行级锁二、排他锁和共享锁三、InnoDB行级锁行级锁间隙锁意向共享锁和意向排他锁 四、InnoDB表级锁五、死锁六、锁的优化建议 一、表级锁和行级锁 表级锁&#xff1a; 对整张表加锁。开销小&#xff0c;加锁快&#xff0c;不会出现死锁&#xff1b;锁粒度…

STL-list的使用简介

目录 ​编辑 一、list的底层实现是带头双向循环链表 二、list的使用 1、4种构造函数&#xff08;与vector类似&#xff09;​编辑 2、迭代器iterator 3、容量&#xff08;capicity&#xff09;操作 4、element access 元素获取 5、增删查改 list modifiers 6、list的迭…

Linxu每日智囊

每日分享三个Linux命令,悄悄培养读者的Linux技能。 欢迎关注公众号(NLP Research) apt 作用 包管理器 语法 apt [选项] 软件包 参数: -h:帮助-y:当安装过程提示选择全部为"yes"-q:不显示安装的过程案例 列出所有可更新的软件清单命令sudo apt update升级软…

ROS2——Parameters

节点可以使用参数来配置各项操作&#xff0c;这些参数可以说布尔值、整数、字符串等类型。节点在启动时会读取参数。我们将参数单独列出来&#xff0c;而不是写在源文件中&#xff0c;这样做可以方便我们调试&#xff0c;因为在不同的机器人、环境中&#xff0c;我们需要的参数…

【Unity】Joystick Pack摇杆插件实现锁四向操作

Joystick Pack ​ 简介&#xff1a;一款Unity摇杆插件&#xff0c;非常轻量化 ​ 摇杆移动类型&#xff1a;圆形、横向、竖向 ​ 摇杆类型&#xff1a; Joystick描述Fixed固定位置Floating浮动操纵杆从用户触碰的地方开始&#xff0c;一直固定到触碰被释放。Dynamic动态操纵…

网卡高级设置-提高网络环境

网卡高级设置&#xff0c;提高网络质量排除一些连接问题 一、有线网卡 1、关闭IPv6&#xff1b; 可以关闭协议版本6&#xff0c;因为它会引起一些网络连接问题&#xff0c;而且现在几乎用不到IP6。 2、关闭节约电源模式&#xff1b; 右击计算机->设备->设备管理器-&…

开源C语言库Melon:数据恢复算法

本文讲述开源C语言库Melon中的里德所罗门纠错码的使用。 关于 Melon 库&#xff0c;这是一个开源的 C 语言库&#xff0c;它具有&#xff1a;开箱即用、无第三方依赖、安装部署简单、中英文文档齐全等优势。 Github repo 简介 里德所罗门编码是一种纠错码技术&#xff0c;…

微信小程序的生命周期函数有哪些?

面试官&#xff1a;说说微信小程序的生命周期函数有哪些&#xff1f; 一、是什么 跟vue、react框架一样&#xff0c;微信小程序框架也存在生命周期&#xff0c;实质也是一堆会在特定时期执行的函数 小程序中&#xff0c;生命周期主要分成了三部分&#xff1a; 应用的生命周期…

MacOS安装Miniforge、Tensorflow、Jupyter Lab等(2024年最新)

大家好&#xff0c;我是邵奈一&#xff0c;一个不务正业的程序猿、正儿八经的斜杠青年。 1、世人称我为&#xff1a;被代码耽误的诗人、没天赋的书法家、五音不全的歌手、专业跑龙套演员、不合格的运动员… 2、这几年&#xff0c;我整理了很多IT技术相关的教程给大家&#xff0…

MySQL数据库导入导出远程备份

一 navcat导入导出 导入 选择数据库 选择自己需要的的脚本进行导入 18万的sql脚本数据 导入时间33秒左右 导出 选择表右击----转存SQL文件---结构和数据 导出时间比较快 二 mysqldump 导入导出 先进入mysql的安装bin目录下&#xff0c;先将所要导入的脚本放入该bin目录下…

从优化设计到智能制造:生成式AI在可持续性3D打印中的潜力和应用

可持续性是现代工业中一个紧迫的问题&#xff0c;包括 3D 打印领域。为了满足环保制造实践日益增长的需求&#xff0c;3D 打印已成为一种有前景的解决方案。然而&#xff0c;要使 3D 打印更具可持续性&#xff0c;还存在一些需要解决的挑战。生成式人工智能作为一股强大的力量&…

Java虚拟机类加载机制探究:生命周期、初始化、使用与验证

一、java虚拟机与程序的生命周期 在如下几种情况之下&#xff0c;java虚拟机将结束生命周期&#xff1a; 执行了System.exit()方法程序正常执行结束程序在执行过程中遇到了异常或者错误而异常终止由于操作系统用出现错误而导致java虚拟机进程终止 二、类的加载&#xff0c;链…

基于ssm运动会管理系统的设计与实现 【附源码】

基于ssm运动会管理系统的设计与实现 【附源码】 &#x1f345; 作者主页 央顺技术团队 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuil…

CSS 圆形分割按钮动画 带背景、图片

<template><view class="main"><view class="up"> <!-- 主要部分上 --><button class="card1"><image class="imgA" src="../../static/A.png"></image></button><butt…

NIO通信代码示例

NIO通信架构图 1.Client NioClient package nio;import constant.Constant;import java.io.IOException; import java.util.Scanner;public class NioClient {private static NioClientHandle nioClientHandle;public static void start() {nioClientHandle new NioClientHa…

3 快速前端开发

3 前端JavaScript 3 前端JavaScript1. JavaScript1.1 代码位置1.2 注释1.3 变量1.4 字符串类型案例&#xff1a;跑马灯 1.5 数组案例&#xff1a;动态数据 1.6 对象&#xff08;字典&#xff09;案例&#xff1a;动态表格 1.7 条件语句1.8 函数 2.DOM2.1 事件的绑定 3.知识点的…