Mongodb 开启oplog,java监听oplog并写入关系型数据库

开启Oplog

windows mongodb bin目录下找到配置文件/bin/mongod.cfg,配置如下:

replication:replSetName: localoplogSizeMB: 1024

在这里插入图片描述
双击mongo.exe
在这里插入图片描述
在这里插入图片描述
执行

rs.initiate({_id: "local", members: [{_id: 0, host: "localhost:27017"}]})

若出现如下情况则成功

{"ok" : 1,"operationTime" : Timestamp(1627503341, 1),"$clusterTime" : {"clusterTime" : Timestamp(1627503341, 1),"signature" : {"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),"keyId" : NumberLong(0)}}
}

监听Oplog日志

pom

 	<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.10</version><relativePath/></parent><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>3.12.7</version></dependency><dependency><groupId>com.vividsolutions</groupId><artifactId>jts</artifactId><version>1.13</version></dependency><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-spatial</artifactId><version>5.3.0.Beta1</version></dependency><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-java8</artifactId><version>5.3.0.Beta1</version></dependency><dependency><groupId>com.bedatadriven</groupId><artifactId>jackson-datatype-jts</artifactId><version>2.3</version></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><scope>runtime</scope></dependency>

配置

spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/databaseName?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&currentSchema=public
spring.datasource.username=postgres
spring.datasource.password=123456
spring.jpa.database=postgresql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.spatial.dialect.postgis.PostgisDialect
server.port=10050
spring.data.mongodb.uri=mongodb://admin:123456@localhost:27017/?authSource=admin
spring.data.mongodb.database=databseName

代码

  import com.mongodb.CursorType;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.util.JSON;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.Query;@Slf4j
@Component
public class OplogListener implements ApplicationListener<ContextRefreshedEvent> {@Resourceprivate MongoTemplate mongoTemplate;@Resourceprivate EntityManager entityManager;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {MongoDatabase db = mongoTemplate.getMongoDatabaseFactory().getMongoDatabase("local");MongoCollection<Document> oplog = db.getCollection("oplog.rs");BsonTimestamp startTS = getStartTimestamp();BsonTimestamp endTS = getEndTimestamp();Bson filter = Filters.and(Filters.gt("ts", startTS));MongoCursor<Document> cursor = oplog.find(filter).cursorType(CursorType.TailableAwait).iterator();while (true) {if (cursor.hasNext()) {Document doc = cursor.next();String operation = doc.getString("op");if (!"n".equals(operation)) {String namespace = doc.getString("ns");String[] nsParts = StringUtils.split(namespace, ".");String collectionName = nsParts[1];String databaseName = nsParts[0];Document object = (Document) doc.get("o");log.info("同步数据:databse-{}  collention-{}  data-{}", databaseName, collectionName, object);if ("i".equals(operation)) {insert((Document) doc.get("o"), databaseName, collectionName);} else if ("u".equals(operation)) {update((Document) doc.get("o"), (Document) doc.get("o2"), databaseName, collectionName);} else if ("d".equals(operation)) {delete((Document) doc.get("o"), databaseName, collectionName);}}}}}private BsonTimestamp getStartTimestamp() {long currentSeconds = System.currentTimeMillis() / 1000;return new BsonTimestamp((int) currentSeconds, 1);}private BsonTimestamp getEndTimestamp() {return new BsonTimestamp(0, 1);}private void insert(Document object, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);Query query = entityManager.createNativeQuery("INSERT INTO " + collectionName + " (json) VALUES (:json)");query.setParameter("json", json);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}private void update(Document object, Document update, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);String updateJson = JSON.serialize(update);Query query = entityManager.createNativeQuery("UPDATE " + collectionName + " SET json = :json WHERE json = :updateJson");query.setParameter("json", json);query.setParameter("updateJson", updateJson);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}private void delete(Document object, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);Query query = entityManager.createNativeQuery("DELETE FROM " + collectionName + " WHERE json = :json");query.setParameter("json", json);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/189196.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入理解前端路由:构建现代 Web 应用的基石(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

多线程(初阶六:单例模式)

一、单例模式的简单介绍 二、饿汉模式 三、懒汉模式 四、饿汉模式和懒汉模式的线程安全问题分析 一、单例模式的简单介绍 单例模式是一种设计模式&#xff0c;其中设计模式是软性的规定&#xff0c;与它关联的框架是硬性的规定&#xff0c;这些都是大佬已经设计好了的&…

QT线程的使用 循环中程序的等待

QT线程的使用 循环中程序的等待 先看效果1 pro文件2 头文件3 源文件4 ui文件先看效果 1 pro文件 QT += concurrent2 头文件 #ifndef MAINWINDOW_H #define MAINWINDOW_H

简述MyBatis、MyBatis-Plus、以及MyBatis-Plus的简单运用

什么是MyBatis MyBatis是一个开源的Java持久层框架&#xff0c;用于简化与关系型数据库的交互。它通过将SQL语句与Java代码进行分离&#xff0c;提供了一种优雅的方式来处理数据库操作。 MyBatis的核心思想是将SQL语句与Java方法进行映射&#xff0c;使得开发人员可以通过配置…

集成开发环境PyCharm的使用【侯小啾python领航计划系列(三)】

集成开发环境 PyCharm 的使用【侯小啾python领航计划系列(三)】 大家好,我是博主侯小啾, 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ�…

【Vue3+Ts项目】硅谷甄选 — 搭建后台管理系统模板

一、 项目初始化 一个项目要有统一的规范&#xff0c;需要使用eslintstylelintprettier来对我们的代码质量做检测和修复&#xff0c;需要使用husky来做commit拦截&#xff0c;需要使用commitlint来统一提交规范&#xff08;即统一提交信息&#xff09;&#xff0c;需要使用pre…

Python | 轻量ORM框架Peewee的基础使用(增删改查、自动创建模型类、事务装饰器)

文章目录 01 简介02 安装03 自动创建模型类04 基础使用4.1 查询4.2 新增4.3 更新4.4 删除 05 事务 01 简介 在使用python开发的过程中&#xff0c;有时需要一些简单的数据库操作&#xff0c;而Peewee正是理想的选择&#xff0c;它是一个小巧而灵活的 Python ORM&#xff08;对…

hadoop-3.3.5安装过程

准备资源三台虚拟机&#xff1a; 1&#xff09;准备3台服务器&#xff08;关闭防火墙、静态IP、主机名称&#xff09; 2&#xff09;安装JDK 3&#xff09;配置环境变量 4&#xff09;安装Hadoop 5&#xff09;配置环境变量 安装虚拟机&#xff08;略&#xff09;--1台即…

如何删除mac苹果电脑上面的流氓软件?

在使用苹果电脑的过程中&#xff0c;有时候我们也会遇到一些不需要的软件。无论是因为不再需要&#xff0c;或者是为了释放磁盘空间&#xff0c;删除这些软件是很重要的。本文将为大家介绍怎样删除苹果电脑上的软件&#xff01; CleanMyMac X全新版下载如下: https://wm.make…

elementUI实现根据屏幕大小自适应换行,栅格化布局

需求&#xff1a; 默认一行展示4个卡片&#xff1b;当屏幕小于某个大小的时候&#xff0c;一行展示3个卡片&#xff1b;再小就展示2个&#xff1b;以此类推&#xff0c;最小就展示1个。 效果卡片样式如下图&#xff1a; 默认一行4个 屏幕缩小到某个阈值&#xff0c;一行展示…

Linux:vim的简单使用

个人主页 &#xff1a; 个人主页 个人专栏 &#xff1a; 《数据结构》 《C语言》《C》《Linux》 文章目录 前言一、vim的基本概念二、vim的基本操作三、vim正常模式命令集四、vim底行模式命令集五、.xxx.swp的解决总结 前言 本文是对Linux中vim使用的总结 一、vim的基本概念 …

iOS Class Guard 成功了,但无法区分差异

iOS Class Guard 成功了&#xff0c;但无法区分差异 我正在开发一个静态库&#xff0c;并使用 Polidea 的 iOS Class Guard 来混淆我的静态库。我按照步骤在项目的根路径中下载 obfuscate_project&#xff0c;更改其中所需的名称&#xff0c;最后在终端中运行 bash obfuscate_p…

算法通关村第六关—二叉树的层次遍历经典问题(白银)

二叉树的层次遍历经典问题 一、层次遍历简介 广度优先遍历又称层次遍历&#xff0c;过程如下&#xff1a;  层次遍历就是从根节点开始&#xff0c;先访问根节点下面一层全部元素&#xff0c;再访问之后的层次&#xff0c;图里就是从左到右一层一层的去遍历二叉树&#xff0c…

学习笔记8——JUC入门基础知识

学习笔记系列开头惯例发布一些寻亲消息 链接&#xff1a;https://baobeihuijia.com/bbhj/contents/3/199561.html 进程和线程:进程是资源分配的最小单位&#xff0c;线程是CPU调度的最小单位 进程和线程的主要区别&#xff08;总结&#xff09;_进程和线程的区别-CSDN博客进程…

Flink的部署模式和运行模式

集群角色 Flink提交作业和执行任务&#xff0c;需要几个关键组件&#xff1a; 客户端&#xff1a;代码由客户端获取并作转换&#xff0c;之后提交给Jobmanager Jobmanager就是Flink集群的管事人&#xff0c;对作业进行中央调度管理&#xff1b;当从客户端获取到转换过的作业后…

NRF24L01 无线收发模块与 Arduino 的应用

NRF24L01 是一款常用的无线收发模块&#xff0c;与 Arduino 兼容性良好&#xff0c;可以用于实现无线通信和数据传输。本文将介绍如何将 NRF24L01 模块与 Arduino 配合使用&#xff0c;包括硬件的连接和配置&#xff0c;以及相应的代码示例。 一、引言 NRF24L01 是一款基于 2.…

CentOS或RHEL安装vscode

下载rpm安装包 网络下载或者下载到本地再上传到服务器&#xff0c;点击访问国内下载地址&#xff0c;不需要积分curl -fOL https://github.com/coder/code-server/releases/download/v4.19.1/code-server-4.19.1-amd64.rpm安装 rpm -i code-server-4.19.1-amd64.rpm关闭和禁用…

STM32F407-14.3.9-01输出比较模式

输出比较模式 此功能用于控制输出波形&#xff0c;或指示已经过某一时间段。 当捕获/比较寄存器与计数器之间相匹配时&#xff0c;输出比较功能&#xff1a; ● 将为相应的输出引脚分配一个可编程值&#xff0c;该值由输出比较模式&#xff08;TIMx_CCMRx 寄存器中的 OCxM⑦…

Python批量Git Pull,对文件夹批量进行Pull操作

效果展示 说明 本来是想写的完善一些&#xff0c;但由于是自用&#xff0c;所以写出来后发现已经解决了自己的问题&#xff0c;所有 2和3功能没有写。 执行的话&#xff0c;需要 cmd 之后 直接 Python BatchGitPull.py 运行下面代码即可。 里面同时涉及到其他Pyhon知识点(写给…

Ubuntu18.04 Udacity project_9_PID_control 如何运行

工程源码和仿真器下载&#xff1a; 源码 仿真器 --- Ubuntu就下载 term2_sim_linux.zip 这个压缩文件即可 紧接着给方框中的文件赋可执行权限 打开project_9_PID_control文件夹 执行如下脚本&#xff0c;安装必要的库&#xff0c;比如websocket&#xff08;程序生成的可执行…