C#实现SqlServer数据库同步

实现效果:

设计思路:
1. 开启数据库及表的cdc,定时查询cdc表数据,封装sql语句(通过执行类型,主键;修改类型的cdc数据只取最后更新的记录),添加到离线数据表;
2. 线程定时查询离线数据表,更新远程库数据;
3. 远程库数据被更改又会产生cdc数据,对此数据进行拦截;

配置文件说明:

{
"AsyncInterval": 30000,
"Drivers": [
{
"RefreshTime": 5000,
"Enable": 1,
"SrcConnect": "Data Source=192.168.8.77;Initial Catalog=master;User ID=sa;Pwd=Nflg1234",
"SrcMap": [ "dbsync2|student,table1,table2,table3", "dbsync3|*" ],
"SrcUpdateCDC": 1,
"DstConnect": [ "Data Source=192.168.8.81;Initial Catalog=master;User ID=sa;Pwd=Nflg1234" ]
}
]
}

{
"AsyncInterval": 25000,
"Drivers": [
{
"RefreshTime": 10000,
"Enable": 1,
"SrcConnect": "Data Source=192.168.8.77;Initial Catalog=master;User ID=sa;Pwd=Nflg1234",
"SrcMap": [ "testsync1|*"],
"SrcUpdateCDC": 1,
"DstConnect": [ "Data Source=192.168.8.81;Initial Catalog=master;User ID=sa;Pwd=Nflg1234" ]
},
{
"RefreshTime": 10000,
"Enable": 1,
"SrcConnect": "Data Source=192.168.8.81;Initial Catalog=master;User ID=sa;Pwd=Nflg1234",
"SrcMap": [ "testsync1|*" ],
"SrcUpdateCDC": 1,
"DstConnect": [ "Data Source=192.168.8.77;Initial Catalog=master;User ID=sa;Pwd=Nflg1234" ]
}
]
}

1. 设置同步间隔时间
2. 根据不同的配置文件,加载不同的模式,多驱动(Drivers 1主1备-单向同步,1主1主-双向同步,2主1备-多库汇总),多机同步(DstConnect),多库同多表同步(SrcMap,dbsync2|*表示监听该数据库下的所有表),设置刷新时间(RefreshTime),是否启用(Enable),是否重置cdc数据(SrcUpdateCDC)

数据表说明:

async_data 离线数据表
id 主键自增 INTEGER
connect_str 连接字符串 NVARCHAR(255)
excute_sql 需要同步的sql语句 NVARCHAR(255)
cdc_time cdc时间 DATETIME
event_time event时间 DATETIME
db_name 数据库名 NVARCHAR(255)
table_name 表名 NVARCHAR(255)
table_pk 表主键 NVARCHAR(255)
excute_type 执行类型(I/U/D) NVARCHAR(255)

sqlserver cdc表(日志表)中如果一条id多次更新,取最新一条数据
sqlite asy_data表(离线数据表),入库时,查dbname + table + pk,无记录则添加,有记录比较cdc记录时间,如果时间更新则更新sql语句

特殊数据处理:
uniqueidentifier类型的数据转为NULL,数据中含有'的替换''

核心代码:

复制代码

using SqlServerAsync.Util.config;
using SqlServerAsync.Util.sqlite;
using SqlServerAsync.Util.sqlite.model;
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading;namespace SqlServerAsync.Util
{public class SqlServerCDC{public void Listen(Driver driver){var update_cdc = driver.SrcUpdateCDC == 1 ? true : false;var enable = driver.Enable == 1 ? true : false;foreach (var map in driver.SrcMap){StartCDC(driver.SrcConnect, driver.DstConnect, driver.RefreshTime, enable, update_cdc, map);}}void StartCDC(string srcconnect, List<string> dstconnect, int refreshTime, bool enable, bool update_cdc, string map){try{var freeSql = new FreeSql.FreeSqlBuilder().UseConnectionString(FreeSql.DataType.SqlServer, srcconnect).UseNoneCommandParameter(true)// 不使用参数化.UseAutoSyncStructure(false)// 不同步表结构.Build();var arrayMap = map.Split('|');var db = arrayMap[0];var tbs = arrayMap[1];string[] arrayTB = null;var dstStr = string.Join("#", dstconnect);if (!enable){Program.AddLog($"禁用监听,来源={srcconnect},目标数={dstconnect.Count},目标={dstStr},db={db},Tables={tbs}");return;}string sql = string.Empty;Dictionary<string, Table> dicTable = new Dictionary<string, Table>();Program.AddLog($"启用监听,来源={srcconnect},目标数={dstconnect.Count},目标={dstStr},db={db},Tables={tbs}");if ("*" == tbs){// 查询db下所有表名sql = $"use {db};select TABLE_NAME from {db}.information_schema.tables where TABLE_SCHEMA='dbo' and TABLE_NAME not in('systranschemas','sysdiagrams')";DataTable dtAll = freeSql.Ado.ExecuteDataTable(sql); var rowCount = dtAll.Rows.Count; if (rowCount > 0) arrayTB = new string[rowCount]; for (int i = 0; i < rowCount; i++){arrayTB[i] = dtAll.Rows[i]["TABLE_NAME"].ToString();}}else{arrayTB = tbs.Split(',');}if (null == arrayTB || 0 == arrayTB.Length){Program.AddLog($"数据库{db},查无数据表 ×");return;}// 开启SQL Server数据库CDCsql = $"use {db};if exists(select 1 from {db}.sys.databases where name='{db}' and is_cdc_enabled=0)\n" +"begin\n" +$"exec {db}.sys.sp_cdc_enable_db\n" +"end";freeSql.Ado.ExecuteNonQuery(sql);// 查询库cdc是否开启成功sql = $"use {db};select is_cdc_enabled from {db}.sys.databases where name='{db}'";DataTable dtCDC_DB = freeSql.Ado.ExecuteDataTable(sql);if (dtCDC_DB.Rows.Count <= 0 || !Convert.ToBoolean(dtCDC_DB.Rows[0]["is_cdc_enabled"])){Program.AddLog($"数据库CDC开启失败({db}) ×");return;}Program.AddLog($"数据库CDC开启成功({db}) √");foreach (var table in arrayTB){if (string.IsNullOrEmpty(table)) continue;if (update_cdc){// 关闭单张表的CDC功能sql = $"use {db};if exists(select 1 from {db}.sys.tables where name='{table}' AND is_tracked_by_cdc=1)\n" +"begin\n" +$"exec {db}.sys.sp_cdc_disable_table @source_schema='dbo',@source_name='{table}',@capture_instance='dbo_{table}'" +"end";freeSql.Ado.ExecuteNonQuery(sql);}// 开启单张表的CDC功能sql = $"use {db};if exists(select 1 from {db}.sys.tables where name='{table}' AND is_tracked_by_cdc=0)\n" +"begin\n" +$"exec {db}.sys.sp_cdc_enable_table\n" +"@source_schema='dbo',\n" +$"@source_name='{table}',\n" +"@capture_instance=NULL,\n" +"@supports_net_changes=1,\n" +"@role_name=NULL\n" +"end";freeSql.Ado.ExecuteNonQuery(sql);// 查询表cdc是否开启成功sql = $"use {db};select is_tracked_by_cdc from {db}.sys.tables WHERE name='{table}'";DataTable dtCDC_TB = freeSql.Ado.ExecuteDataTable(sql);if (dtCDC_TB.Rows.Count <= 0 || !Convert.ToBoolean(dtCDC_TB.Rows[0]["is_tracked_by_cdc"])){Program.AddLog($"数据表CDC开启失败({table}) ×");continue;}Program.AddLog($"数据表CDC开启成功({table}) √");Table tb = new Table() { Name = table };// 获取字段名,是否主键,字段类型sql = $"use {db};SELECT distinct col.name AS 'Name', idx.is_primary_key as 'IsPK',TYPE_NAME(system_type_id) as 'Type'\n" +$"FROM sys.columns col\n" +$"LEFT JOIN sys.index_columns idxcol ON col.object_id=idxcol.object_id AND col.column_id=idxcol.column_id\n" +$"LEFT JOIN sys.indexes idx ON idxcol.object_id=idx.object_id AND idxcol.index_id=idx.index_id\n" +$"WHERE col.object_id=OBJECT_ID('{table}')";List<Field> lstField = freeSql.Ado.Query<Field>(sql);foreach (var field in lstField){var ispk = Convert.ToBoolean(field.IsPK);if (ispk){tb.LstPKField.Add(field);// 主键,用于更新删除}else{tb.LstDataField.Add(field);}}dicTable.Add(table, tb);}Program.AddLog($"监听成功,{db}");// 定时轮询ThreadPool.QueueUserWorkItem(delegate{Dictionary<string, string> dicTBUpdatePK = new Dictionary<string, string>();while (true){try{ foreach (var item in dicTable){dicTBUpdatePK.Clear();var table_name = item.Key;var tableEntity = item.Value;// cdc表查询//__$start_lsn :与相应更改的提交事务关联的日志序列号(LSN)//__$end_lsn : (在 SQL Server 2008中,此列始终为 NULL)//__$seqval :对事务内的行更改顺序//__$operation :源表DML操作var cdc_table_name = $"{db}.cdc.dbo_{table_name}_CT";sql = $"use {db};select sys.fn_cdc_map_lsn_to_time(__$start_lsn) as cdctime,* from {cdc_table_name}";// 查询cdc时间var dt = freeSql.Ado.ExecuteDataTable(sql);table_name = $"{db}.dbo." + table_name;for (int i = 0; i < dt.Rows.Count; i++){var row = dt.Rows[i];var lstPKField = tableEntity.LstPKField;var lstDataField = tableEntity.LstDataField;var cdctime = Convert.ToDateTime(row["cdctime"]);var operation = Convert.ToInt32(row["__$operation"]);var seqval = (byte[])(row["__$seqval"]);// __$start_lsn代表事件时间,并发时,会有相同的情况,改用__$seqvalvar str_seqval = BitConverter.ToString(seqval, 0).Replace("-", string.Empty);if (3 == operation){continue;}var sql_cdc_execute = string.Empty;string table_pk = string.Empty;foreach (var field1 in lstPKField){table_pk += field1.Name + "='" + row[field1.Name] + "' and ";}table_pk = table_pk.Substring(0, table_pk.Length - 5);string cdc_dic_pk = table_name + ";" + table_pk;// cdc表中过滤多条表中一条记录多次更新,取最新一条数据(查询过的数据利用字典存储)string str_seqval1 = string.Empty;if (4 == operation){if (dicTBUpdatePK.ContainsKey(cdc_dic_pk)){str_seqval1 = dicTBUpdatePK[cdc_dic_pk];}else{// 查询多次更新后的最新值sql = $"use {db};select top 1 __$seqval from {cdc_table_name} where {table_pk} and __$operation=4 order by __$seqval desc";var dtlsn = freeSql.Ado.ExecuteDataTable(sql);var seqval1 = (byte[])(dtlsn.Rows[0]["__$seqval"]);str_seqval1 = BitConverter.ToString(seqval1, 0).Replace("-", string.Empty);dicTBUpdatePK.Add(cdc_dic_pk, str_seqval1);}}// 删除cdc表数据sql = $"use {db};delete from {cdc_table_name} where __$seqval=CONVERT(BINARY(10), '{str_seqval}', 2)";freeSql.Ado.ExecuteNonQuery(sql);string excute_type = string.Empty;switch (operation){case 1:// 删除excute_type = BaseEnum.Delete;sql_cdc_execute = $"delete from {table_name} where {table_pk}";break;case 2:// 插入excute_type = BaseEnum.Insert;string insertField = string.Empty;string insertValue = string.Empty;foreach (var field1 in lstPKField){ insertField += field1.Name + ",";insertValue += HandleSpecialData(field1.Type, row[field1.Name]) + ",";}foreach (var field2 in lstDataField){insertField += field2.Name + ",";insertValue += HandleSpecialData(field2.Type, row[field2.Name]) + ",";} insertField = insertField.Substring(0, insertField.Length - 1);insertValue = insertValue.Substring(0, insertValue.Length - 1);sql_cdc_execute = $"insert into {table_name} ({insertField}) values({insertValue})";break;case 3:break;case 4:// 修改 if (str_seqval == str_seqval1)// 最新的数据{excute_type = BaseEnum.Update;string updateData = string.Empty; foreach (var field2 in lstDataField){updateData += field2.Name + "=" + HandleSpecialData(field2.Type, row[field2.Name]) + ",";}updateData = updateData.Substring(0, updateData.Length - 1);sql_cdc_execute = $"update {table_name} set {updateData} where {table_pk}";}break;}if (!string.IsNullOrEmpty(sql_cdc_execute)){foreach (var dst in dstconnect){bool add = true; string key1 = srcconnect + "_" + table_name + "_" + table_pk; // A同步B,B更新后,CDC日志返回A,这边做截取if (Program.DicExecuted.ContainsKey(key1)){add = false;string removedValue;Program.DicExecuted.TryRemove(key1, out removedValue);}else{// 修改以最后时间的数据为准var entity = SqliteHelper.GetUpdateAsyncData(db, table_name, table_pk);if (null == entity){var asyncdata = new AsyncData() { ConnectStr = dst, ExcuteSQL = sql_cdc_execute, CDCTime = cdctime, EventTime = DateTime.Now, DBName = db, TableName = table_name, TablePK = table_pk, ExcuteType = excute_type };SqliteHelper.InsertAsyncData(asyncdata);}else{// 比较时间if (DateTime.Compare(entity.CDCTime, cdctime) < 0){SqliteHelper.UpdateAsyncData(dst, sql_cdc_execute, entity.Id);}else{add = false;}}if (add){if (dst.Contains("192.168.8.81")){Console.WriteLine("111");}Program.AddLog($"添加,dst:{dst},sql:{sql_cdc_execute}");}}}} }}}catch (Exception ex){Program.AddLog($"Listen Error,ex:{ex.Message}");}Thread.Sleep(refreshTime);}});}catch (Exception ex){Program.AddLog($"[Error] 初始化CDC异常,errmsg:{ex.Message}");}}/// <summary>/// 特殊数据类型处理 1. uniqueidentifier为空时,设置为NULL;2. 单引号,转成双号/// </summary>/// <param name="val"></param>/// <returns></returns>public string HandleSpecialData(string type, object val){if (null == val) return string.Empty;string ret = val.ToString(); bool special = false;if ("uniqueidentifier" == type.ToLower())// 特殊数据类型处理{if (string.IsNullOrEmpty(ret)){special = true;ret = "NULL";}}if (!special){if (ret.Contains("'")){ret = ret.Replace("'", "''");// 把单引号转成双引号}ret = $"'{ret}'";}return ret;} }public class Table{public string Name { get; set; }public List<Field> LstPKField { get; set; } = new List<Field>();public List<Field> LstDataField { get; set; } = new List<Field>();}public class Field{public string Name { get; set; }public string IsPK { get; set; } public string Type { get; set; }// GUID,uniqueidentifier为空时,改为NULL}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/27950.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

有哪些常用的设计素材网站?

素材网站可以是设计师和创意人员的灵感来源。这些网站收集了各种类型的平面设计图片&#xff0c;包括标志、海报、网站设计、包装设计、插图等。在本文中&#xff0c;我将推荐15个平面设计图素材网站&#xff0c;以帮助您找到新的想法和灵感。 1.即时设计资源社区 即时设计资…

SpringBoot 热部署

文章目录 前言一、spring-boot-devtools添加热部署框架支持settings 开启项目自动编译开启运行中热部署使用Debug启动 二、IDEA 自带 HowSwap 功能设置 Spring Boot 启动类等待项目启动完成点击热加载按钮存在的问题 三、JRebel 插件【推荐】安装插件使用插件 前言 在日常开发…

分布式协议与算法——CAP理论、ACID理论、BASE理论

CAP理论 CAP理论&#xff0c;对分布式系统的特性做了高度抽象&#xff0c;比如抽象成了一致性、可用性和分区容错性&#xff0c;并对特性间的冲突&#xff08;也就是CAP不可能三角&#xff09;做了总结。 CAP三指标 CAP理论对分布式系统的特性做了高度抽象&#xff0c;形成了…

BL302嵌入式ARM控制器进行SQLite3数据库操作的实例演示

本文主要讲述了在钡铼技术BL302嵌入式arm控制器上运行 SQLite3 数据库的命令示例。SQLite3 是一个轻型的嵌入式数据库&#xff0c;不需要安装数据库服务器进程&#xff0c;占用资源低且处理速度快。 首先&#xff0c;需要将对应版本的 SQLite3 文件复制到设备的 /usr/ 目录下&…

c++开发模式,享元模式

享元模式&#xff0c;个人理解&#xff0c;就是应用共享技术来减少类的对象创建&#xff0c;节省计算机资源消耗&#xff0c;而且能够减少维护成本 #include <iostream> #include <string> #include <vector>using namespace std;class Flyweight { public:…

python之prettytable库的使用

文章目录 一 什么是prettytable二 prettytable的简单使用1. 添加表头2. 添加行3. 添加列4. 设置对齐方式4. 设置输出表格样式5. 自定义边框样式6. 其它功能 三 prettytable在实际中的使用 一 什么是prettytable prettytable是Python的一个第三方工具库&#xff0c;用于创建漂亮…

【微信小程序】申请蓝牙、位置和数据库等相关权限

在小程序的app.json文件中配置requiredPermissions字段&#xff0c;并在其中添加相应的权限。 以下是一个示例app.json文件的配置&#xff0c;包括了蓝牙、位置和数据库等权限的申请&#xff1a; {"pages": ["pages/index/index"],"requiredPermiss…

后端开发4.Elasticsearch的搭建

使用docker安装 安装elasticsearch 拉取镜像 docker pull elasticsearch:7.17.0容器间建立通信,创建 elastic的网关 docker network create elastic 创建es容器【自启动】【虚拟机处理器数量至少两个】 docker run --restart=always -p 9200:9200 -p 9300:9300 -e "…

CI/CD持续集成持续发布(jenkins)

1.背景 在实际开发中&#xff0c;我们经常要一边开发一边测试&#xff0c;当然这里说的测试并不是程序员对自己代码的单元测试&#xff0c;而是同组程序员将代码提交后&#xff0c;由测试人员测试&#xff1b; 或者前后端分离后&#xff0c;经常会修改接口&#xff0c;然后重新…

【LeetCode 热题 100】图论 专题(bfs,拓扑排序,Trie树 字典树)

from&#xff1a; https://leetcode.cn/studyplan/top-100-liked/ bfs 具有 边权为1 的最短路性质 拓扑排序&#xff0c;入度 Trie树&#xff0c; 高效存储 字符串【见鬼&#xff0c;不知道为什么写错&#xff0c;需要掌握熟练度】 文章目录 200. 岛屿数量【dfs / bfs】994. 腐…

Linux学习之awk

awk多数情况下作为sed的补充使用&#xff0c;awk会对sed处理过的内容进行格式的调整并输出。awk处理“比较规范”的文件&#xff0c;使用方法比较像脚本文件&#xff0c;sed把比较不规范的文件处理成比较规范的文件。 awk有三部分组成&#xff1a; 输入数据前例程 BEGIN{} 主输…

【LeetCode】287.寻找重复数

题目 给定一个包含 n 1 个整数的数组 nums &#xff0c;其数字都在 [1, n] 范围内&#xff08;包括 1 和 n&#xff09;&#xff0c;可知至少存在一个重复的整数。 假设 nums 只有 一个重复的整数 &#xff0c;返回 这个重复的数 。 你设计的解决方案必须 不修改 数组 nums…

Makefile

什么是 Makefile 一个工程中的源文件不计其数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c; Makefile文件定义了一系列的规则来指定哪些文件需要先编译&#xff0c;哪些文件需要后编 译&#xff0c;哪些文件需要重新编译&#xff0c;甚至于进行更复杂的功…

Android平台GB28181设备接入端如何实现多视频通道接入?

技术背景 我们在设计Android平台GB28181设备接入模块的时候&#xff0c;有这样的场景诉求&#xff0c;一个设备可能需要多个通道&#xff0c;常见的场景&#xff0c;比如车载终端&#xff0c;一台设备&#xff0c;可能需要接入多个摄像头&#xff0c;那么这台车载终端设备可以…

使用webpack插件webpack-dev-server 出现Cannot GET/的解决办法

问题描述 文档地址深入浅出webpack 使用 DevServer运行webpack&#xff0c;跑起来之后提示Cannot GET/&#xff1a; 解决方案&#xff1a; 查阅官方文档 根据目录结构修改对应的配置&#xff1a; 然后就可以成功访问&#xff1a;

【MongoDB】万字长文,命令与代码一一对应SpringBoot整合MongoDB之MongoTemplate

目录 一、导入依赖与配置信息 二、导入测试数据创建实体类 三、插入数据 1、Insert默认集合插入 2、Insert指定集合插入 3、Insert批量插入数据 4、save默认集合插入 5、save指定集合插入 6、insert与save的区别 四、修改数据 1、修改符合条件的第一条数据 2、全…

redis 原理 7:开源节流 —— 小对象压缩

Redis 是一个非常耗费内存的数据库&#xff0c;它所有的数据都放在内存里。如果我们不注意节约使用内存&#xff0c;Redis 就会因为我们的无节制使用出现内存不足而崩溃。Redis 作者为了优化数据结构的内存占用&#xff0c;也苦心孤诣增加了非常多的优化点&#xff0c;这些优化…

[PM]敏捷开发之Scrum总结

在项目管理中&#xff0c;不少企业和项目团队也发现传统的项目管理模式已不能很好地适应今天的项目环境的要求。因此&#xff0c;敏捷项目管理应运而生&#xff0c;本文将为大家介绍Scrum敏捷项目管理以及应用方法。 什么是Scrum敏捷项目管理 敏捷项目管理作为新兴的项目管理模…

Java后台生成ECharts图片,并以Base64字符串返回

前言 通过echarts的jar包&#xff0c;Java后台生成一张图片&#xff0c;并把图片插入到word中。关于word插图片的代码在下一章。 需要用到的工具PhantomJS,Echarts-convert.js,jquery.js,echarts.js。 1.PhantomJS 介绍 PhantomJS是一个不需要浏览器的富客户端。 官方介绍&…

[保研/考研机试] 猫狗收容所 C++实现

题目描述&#xff1a; 输入&#xff1a; 第一个是n&#xff0c;它代表操作序列的次数。接下来是n行&#xff0c;每行有两个值m和t&#xff0c;分别代表题目中操作的两个元素。 输出&#xff1a; 按顺序输出收养动物的序列&#xff0c;编号之间以空格间隔。 源代码&#xff…