不同数据库进行同步和增量数据(SQL server 与MySQL数据库为例)

场景

最近在做的一个项目需要将远程服务器的SQL server数据库中表的数据传输到本机的MySQL数据库中,并且远程的SQL server数据库表的数据会实时进行更新,并且差不多是一分钟内传输18条数据,例如现在是2023-12-4 15:09,在15:08这个时间内有18条数据需要首先进SQL server数据库,再更新到MySQL数据库中,这种场景如果每分钟都能将18条数据放入SQL server数据库的话就非常简单了,但是在15:08的时候,这18条数据可能只来11条,剩下的7条可能在15:09或后面的时间陆续过来。我开始的想法是通过最后更新的时间的时间戳来查询新来的数据然后更新到MySQL中,但是由于在最终的时间内还会来前面时间的数据,这样会导致前面时间的数据丢失,所以我想了另外一方法。

  1. 首先使用python写一个程序来同步SQL sever的历史数据到MySQL数据库中
  2. 在SQL server中创建一个中间表。
  3. 在SQL server中要传输的表中创建一个触发器,当这个表更新数据则触发将更新的数据放入到中间表中
  4. 在python脚本中写一个循环来定期检查中间表,我的SQL server表中由两个主键定义一条数据,所以中间表也是由两个字段定义一条数据,由于入库历史数据的数据量非常大,有几十万条,在这个入库历史数据的时间段内更新了很多条数据,所以可能中间表的数据与入库到MySQL中的字段有重复,所以我需要先验证中间表中的数据MySQL是否存在。
    1. 存在则删除中间表中这条数据
    2. 不存在则插入MySQL后删除这条数据
  5. 最后完成了入库程序,经过验证没有数据丢失

1.历史数据入库

历史数据入库我使用的python写的,首先定义两个数据库的信息

# 使用示例
sql_server_conn_params = {'driver': '{SQL Server}','server': 'ip','database': '数据库名','uid': 'jzyg','pwd': ''
}mysql_conn_params = {'host': 'localhost','user': 'root','password': '123456','database': '数据库名','charset': 'utf8mb4'
}

定义查询语句

querySolar = 'SELECT dtime,stationID,staionName,electric,tiltSolar,levelSolar,scatterSolar,directSolar,tiltSolar_day,levelSolar_day,scatterSolar_day,directSolar_day,sunShine_day FROM realData_Solar'

定义入库历史数据函数

    def transfer_wind_data(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 修改查询,仅选择上次同步后的数据modified_query = self.queryWind + " WHERE dtime >= ? ORDER BY dtime"sql_server_cursor.execute(modified_query, (self.wind_last_dtime,))rows = sql_server_cursor.fetchall()if not rows:return  # 没有新数据# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:data_list = []for row in rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号sql = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(sql, ('%' + farmName + '%',))result = mysql_cursor.fetchone()farm_id = Noneif result is not None:farm_id = result# 处理查询结果为空的情况if farm_id is not None:farm_id = farm_id[0]staion_name =  row[2]wind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]data = (observe_time,fsz_id,staion_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min)data_list.append(data)self.wind_last_dtime = row[0].strftime("%Y-%m-%d %H:%M:%S")if data_list:result = mysql_cursor.executemany('INSERT INTO wind_monitor''(observe_time,fsz_id,station_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min) ''VALUES (%s,%s,%s,%s,%s,%s,%s,%s)', data_list)  # 根据你的表结构修改mysql_conn.commit()print(f'wind_monitor表插入了{result}行数据.' if result else '没有新数据插入。')

2.创建中间表和触发器

创建中间表

CREATE TABLE intermediateData_Wind AS SELECT * FROM realData_Wind WHERE 1=0;

创建触发器

CREATE TRIGGER CopyToIntermediateTable
ON realData_Wind
AFTER INSERT
AS
BEGIN-- 插入操作INSERT INTO intermediateData_Wind (dtime, stationID, staionName, windDirectionInstant, windSpeedInstant, windSpeed2min, windSpeed10min)SELECT dtime, stationID, staionName, windDirectionInstant, windSpeedInstant, windSpeed2min, windSpeed10minFROM inserted;
END;

3.创建轮询中间表代码

def transfer_insert_intermediateData_Wind(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 查询中间表中所有数据sql_server_cursor.execute(self.queryIntermediateData)intermediate_rows = sql_server_cursor.fetchall()# 用于跟踪删除和插入的数量deleted_count = 0inserted_count = 0# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:for row in intermediate_rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()# 检查wind_monitor表中是否存在相同数据check_query = "SELECT COUNT(*) FROM wind_monitor WHERE observe_time = %s AND fsz_id = %s"mysql_cursor.execute(check_query, (observe_time, fsz_id))count = mysql_cursor.fetchone()[0]dtime = row[0].strftime("%Y-%m-%d %H:%M:00.000")if count > 0:# 数据存在,从中间表删除delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()deleted_count += 1else:# 数据不存在,插入到wind_monitor并从中间表删除station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号farm_query = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(farm_query, ('%' + farmName + '%',))farm_result = mysql_cursor.fetchone()farm_id = farm_result[0] if farm_result else Nonewind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]insert_data = (observe_time, fsz_id, station_name, farm_id, wind_direction_instant,wind_speed_instant, wind_speed_two_min, wind_speed_ten_min)insert_query = 'INSERT INTO wind_monitor (observe_time, fsz_id, station_name, farm_id, wind_direction_instant, wind_speed_instant, wind_speed_two_min, wind_speed_ten_min) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'mysql_cursor.execute(insert_query, insert_data)mysql_conn.commit()inserted_count += 1delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()# 打印删除和插入的数据统计print(f"从intermediateData_Wind表中删除了{deleted_count}条数据.")print(f"向wind_monitor表中插入了{inserted_count}条数据.")

4.总体代码

import threading
import time
import pyodbc
import pymysql
class DataTransfer:def __init__(self, sql_server_conn_params, mysql_conn_params, queryWind,queryIntermediateData, interval=1):self.sql_server_conn_params = sql_server_conn_paramsself.mysql_conn_params = mysql_conn_paramsself.queryWind = queryWindself.queryIntermediateData = queryIntermediateDataself.interval = intervalself.wind_last_dtime = '1970-01-01 00:00:00'  # 初始时间def clear_mysql_tables(self):"""清空 MySQL 中的指定表格数据"""try:with pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as cursor:# 清空 wind_monitor 表cursor.execute("TRUNCATE TABLE wind_monitor")mysql_conn.commit()print("已清空 wind_monitor 表的数据。")except Exception as e:print(f"清空表格时发生错误: {e}")def transfer_data(self):self.transfer_wind_data()while True:try:self.transfer_insert_intermediateData_Wind()except Exception as e:print(f"发生错误: {e}")# 等待一定时间再次传输数据time.sleep(self.interval)def transfer_insert_intermediateData_Wind(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 查询中间表中所有数据sql_server_cursor.execute(self.queryIntermediateData)intermediate_rows = sql_server_cursor.fetchall()# 用于跟踪删除和插入的数量deleted_count = 0inserted_count = 0# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:for row in intermediate_rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()# 检查wind_monitor表中是否存在相同数据check_query = "SELECT COUNT(*) FROM wind_monitor WHERE observe_time = %s AND fsz_id = %s"mysql_cursor.execute(check_query, (observe_time, fsz_id))count = mysql_cursor.fetchone()[0]dtime = row[0].strftime("%Y-%m-%d %H:%M:00.000")if count > 0:# 数据存在,从中间表删除delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()deleted_count += 1else:# 数据不存在,插入到wind_monitor并从中间表删除station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号farm_query = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(farm_query, ('%' + farmName + '%',))farm_result = mysql_cursor.fetchone()farm_id = farm_result[0] if farm_result else Nonewind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]insert_data = (observe_time, fsz_id, station_name, farm_id, wind_direction_instant,wind_speed_instant, wind_speed_two_min, wind_speed_ten_min)insert_query = 'INSERT INTO wind_monitor (observe_time, fsz_id, station_name, farm_id, wind_direction_instant, wind_speed_instant, wind_speed_two_min, wind_speed_ten_min) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'mysql_cursor.execute(insert_query, insert_data)mysql_conn.commit()inserted_count += 1delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()# 打印删除和插入的数据统计print(f"从intermediateData_Wind表中删除了{deleted_count}条数据.")print(f"向wind_monitor表中插入了{inserted_count}条数据.")def transfer_wind_data(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 修改查询,仅选择上次同步后的数据modified_query = self.queryWind + " WHERE dtime >= ? ORDER BY dtime"sql_server_cursor.execute(modified_query, (self.wind_last_dtime,))rows = sql_server_cursor.fetchall()if not rows:return  # 没有新数据# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:data_list = []for row in rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号sql = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(sql, ('%' + farmName + '%',))result = mysql_cursor.fetchone()farm_id = Noneif result is not None:farm_id = result# 处理查询结果为空的情况if farm_id is not None:farm_id = farm_id[0]staion_name =  row[2]wind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]data = (observe_time,fsz_id,staion_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min)data_list.append(data)self.wind_last_dtime = row[0].strftime("%Y-%m-%d %H:%M:%S")if data_list:result = mysql_cursor.executemany('INSERT INTO wind_monitor''(observe_time,fsz_id,station_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min) ''VALUES (%s,%s,%s,%s,%s,%s,%s,%s)', data_list)  # 根据你的表结构修改mysql_conn.commit()print(f'wind_monitor表插入了{result}行数据.' if result else '没有新数据插入。')def start(self):# 在启动线程前先清空表格self.clear_mysql_tables()thread = threading.Thread(target=self.transfer_data)thread.start()sql_server_conn_params = {'driver': '{SQL Server}','server': '','database': '','uid': '','pwd': ''
}mysql_conn_params = {'host': 'localhost','user': 'root','password': '123456','database': '','charset': 'utf8mb4'
}
queryIntermediateData = "SELECT dtime,stationID,staionName,windDirectionInstant,windSpeedInstant,windSpeed2min,windSpeed10min FROM intermediateData_Wind"
queryWind = 'SELECT dtime,stationID,staionName,windDirectionInstant,windSpeedInstant,windSpeed2min,windSpeed10min FROM realData_Wind'
data_transfer = DataTransfer(sql_server_conn_params, mysql_conn_params, queryWind,queryIntermediateData)
data_transfer.start()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/196551.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JDK安装太麻烦?一篇文章搞定

JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVMJava系统类库)和JAVA工具。 JDK包含的基本组件包括: javac – 编译器&#xf…

FCRP第二题

【题目要求】 数据库中有一张地区数据统计表,但是并不规则 ,记录类似于,225100:02:3:20160725是一串代码,以:分割,第1位为地区代码,第2位为分类代码,第3位为数量,第4位为…

X540t2关于手动安装intel驱动

首先去intel驱动官网下载,win10和win11驱动一样 https://www.intel.cn/content/www/cn/zh/download/18293/intel-network-adapter-driver-for-windows-10.html 然后下载下来解压 将Wired_driver_28.2_x64.exe修改成Wired_driver_28.2_x64.zip文件再解压 打开设备管…

【算法思考记录】力扣1094.拼车 C++【树状数组】

拼车问题(LeetCode 1094)的解析与C实现 Problem: 1094. 拼车 题目背景 在本题中,我们需要处理一个拼车的问题。假设一辆车有固定的座位容量,我们需要根据乘客的上车和下车地点,判断车辆是否能够在整个行程中满足不超过…

DPDK驱动加载

目录 整体加载思路 配置步骤 全局变量配置 调用流程编写 加载VFIO模块 删除VFIO模块 加载KNI模块 卸载KNI模块 创建大页内存 创建大页内存文件系统 删除遗留的大页内存 卸载大页文件系统 调用dpdk-devbind.py脚本来绑定PCI设备igb_uio驱动 加载igb_uio.ko驱动. 卸…

基于springboot + vue 学生网上请假系统

qq(2829419543)获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:springboot 前端:采用vue技术开发 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件&#xf…

GeoServer本地部署与远程访问Web管理页面——“cpolar内网穿透”

文章目录 前言1.安装GeoServer2. windows 安装 cpolar3. 创建公网访问地址4. 公网访问Geo Servcer服务5. 固定公网HTTP地址 前言 GeoServer是OGC Web服务器规范的J2EE实现,利用GeoServer可以方便地发布地图数据,允许用户对要素数据进行更新、删除、插入…

(C语言)求出1,2,5三个数不同个数组合为100的组合个数

#include<stdio.h> int main() {int count;for(int i 0;i < 100;i )for(int j 0;j < 50;j )for(int k 0;k < 20;k ){if(i j*2 k*5 100){count;printf("100可以拆分为%d个1元&#xff0c;%d个2元&#xff0c;%d个5元\n",i,j,k);} }printf("…

正则表达式与SQL数据库教程

使用正则表达式通过用例查询 Postgres 数据库&#xff1a; 正则表达式&#xff08;又名 Regex&#xff09; 正则表达式是一个强大的工具&#xff0c;广泛用于模式匹配和文本操作。 几乎所有编程语言都支持它们&#xff0c;并且经常用于文本提取、搜索和匹配文本等用例。 正则…

【刷题】二分查找

二分查找 69. x 的平方根 给你一个非负整数 x &#xff0c;计算并返回 x 的 算术平方根 。 由于返回类型是整数&#xff0c;结果只保留 整数部分 &#xff0c;小数部分将被 舍去 。 注意&#xff1a;不允许使用任何内置指数函数和算符&#xff0c;例如 pow(x, 0.5) 或者 x **…

zipfile --- 使用ZIP存档

目录 ZipFile 对象 Path 对象 PyZipFile 对象 ZipInfo 对象 命令行接口 命令行选项 解压缩的障碍 由于文件本身 文件系统限制 资源限制 中断 提取的默认行为 源代码: Lib/zipfile/ ZIP 文件格式是一个常用的归档与压缩标准。 这个模块提供了创建、读取、写入、添加…

数据接口测试工具 Postman 介绍!

此文介绍好用的数据接口测试工具 Postman&#xff0c;能帮助您方便、快速、统一地管理项目中使用以及测试的数据接口。 1. Postman 简介 Postman 一款非常流行的 API 调试工具。其实&#xff0c;开发人员用的更多。因为测试人员做接口测试会有更多选择&#xff0c;例如 Jmeter…

探索人工智能领域——每日20个名词详解【day6】

目录 前言 正文 总结 &#x1f308;嗨&#xff01;我是Filotimo__&#x1f308;。很高兴与大家相识&#xff0c;希望我的博客能对你有所帮助。 &#x1f4a1;本文由Filotimo__✍️原创&#xff0c;首发于CSDN&#x1f4da;。 &#x1f4e3;如需转载&#xff0c;请事先与我联系以…

HCIP——交换综合实验

一、实验拓扑图 二、实验需求 1、PC1和PC3所在接口为access&#xff0c;属于vlan2&#xff1b;PC2/4/5/6处于同一网段&#xff0c;其中PC2可以访问PC4/5/6&#xff1b;但PC4可以访问PC5&#xff0c;不能访问PC6 2、PC5不能访问PC6 3、PC1/3与PC2/4/5/6/不在同一网段 4、所有PC通…

CleanMyMac X2024破解注册激活码

CleanMyMac X for Mac中文2024版只需两个简单步骤就可以把系统里那些乱七八糟的无用文件统统清理掉&#xff0c;节省宝贵的磁盘空间。 cleanmymac x个人认为X代表界面上的最大升级&#xff0c;功能方面有更多增加&#xff0c;与最新macOS系统更加兼容&#xff0c;流畅地与系统性…

Educational Codeforces Round 159 (Rated for Div. 2) 题解 A-C

目录 A - Binary ImbalanceB - Getting PointsC - Insert and Equalize A - Binary Imbalance 原题链接 题目描述 给你一个只包含字符 0或 1的字符串 s s s。在每次操作中&#xff0c;你可以从任意两个字符间插入一个字符&#xff1a;如果两个相邻的字符相等&#xff0c;那么…

选择排序、插入排序、希尔排序

1.选择排序 算法描述 将数组分为两个子集&#xff0c;排序的和未排序的&#xff0c;每一轮从未排序的子集中选出最小的元素&#xff0c;放入排序子集 重复以上步骤&#xff0c;直到整个数组有序 选择排序呢&#xff0c;就是首先在循环中&#xff0c;找到数组中最小的元素。在…

Docker 安装部署 Sentinel Dashboard

1、下载 jar 包 官方 jar 包下载地址&#xff1a;https://github.com/alibaba/Sentinel/releases 或者点击 链接 直接跳转到下载页 进入链接下载你需要的版本 下载完毕&#xff08;我这里统一放在一个sentinel目录内&#xff09; 2、编写 Dockerfile 文件&#xff08;这里我不…

详解—[C++数据结构]—红黑树

目录 一、红黑树的概念 ​编辑二、红黑树的性质 三、红黑树节点的定义 四、红黑树结构 五、红黑树的插入操作 5.1. 按照二叉搜索的树规则插入新节点 5.2、检测新节点插入后&#xff0c;红黑树的性质是否造到破坏 情况一: cur为红&#xff0c;p为红&#xff0c;g为黑&…

一键式紧急报警柱系统

随着科技的不断发展&#xff0c;一键式紧急报警柱在我们的生活和工作中扮演着越来越重要的角色。在这篇文章中&#xff0c;我们将一起探究与一键式紧急报警柱有关的知识。 一键式紧急报警柱是一种常见的安全防护设备&#xff0c;能够在紧急情况下快速发出警报&#xff0c;保护…