数据同步大冒险:PostgreSQL到MySQL的奇妙之旅

引言:一场跨数据库的浪漫邂逅 💑

在数据的世界里,不同数据库系统就像是来自不同星球的恋人,它们各自拥有独特的魅力,但偶尔也会渴望一场跨越界限的亲密接触。今天,我们就来见证一场PostgreSQL与MySQL之间的浪漫邂逅——定时获取PostgreSQL中的数据,并将其温柔地同步至MySQL的怀抱中。这不仅是一场技术的挑战,更是一次数据流转的艺术展现!

场景设定:数据的星际旅行 🚀

想象一下,你是一位数据守护者,负责管理着两个星球的数据库:PostgreSQL的“科技星球”和MySQL的“人文星球”。每天,你都需要从“科技星球”收集最新的科研成果(数据),然后运送到“人文星球”上,让那里的居民也能享受到科技进步的果实。

思路分析:星际导航图 🗺️

要完成这场星际旅行,我们需要精心规划航线:

启航准备:确保两艘飞船(数据库连接)都已就绪,且飞船上的储物舱(数据表)结构相似,便于数据转移。
坐标定位:首先,从PostgreSQL中读取最新的数据ID,这是我们的“出发坐标”。然后,在MySQL中查询最新的数据ID,作为我们的“目标坐标”。
航线规划:通过比较两个坐标的差距,我们可以确定哪些数据是新增的,需要被“运送”到MySQL。这就像是在浩瀚的数据海洋中,绘制出一条最优的航线。
数据转移:根据规划好的航线,将选定的数据从PostgreSQL中提取出来,并安全地“降落”在MySQL的相应位置。
实战代码:编写星际导航程序 💻
虽然具体的代码实现需要你们自己来完成(因为你们才是这场冒险的主角!),但我可以给你们一个大致的框架,就像是一个星际导航程序的伪代码:

# 假设这是你的星际导航程序  def connect_to_postgresql():  # 连接PostgreSQL数据库,获取连接对象  # ...  return pg_connection  def connect_to_mysql():  # 连接MySQL数据库,获取连接对象  # ...  return mysql_connection  def fetch_latest_ids(connection, table_name):  # 从指定数据库中获取最新数据ID  # 使用SQL查询,如 SELECT MAX(id) FROM table_name  # ...  return latest_id  def sync_data(pg_connection, mysql_connection, source_table, target_table):  # 1. 获取两个数据库的最新ID  pg_latest_id = fetch_latest_ids(pg_connection, source_table)  mysql_latest_id = fetch_latest_ids(mysql_connection, target_table)  # 2. 确定需要同步的数据范围  if pg_latest_id > mysql_latest_id:  # 3. 编写SQL查询,选择ID在mysql_latest_id到pg_latest_id之间的数据  # 4. 执行查询,获取数据  # 5. 编写SQL语句,将获取的数据插入MySQL  # ...  # 定时执行数据同步任务  
# 可以使用APScheduler等库来实现定时任务

结尾:星际旅行的意义 🌟
通过这场PostgreSQL到MySQL的数据同步冒险,我们不仅实现了数据的跨库流动,更深刻体会到了数据在不同系统间共享的重要性。正如星际旅行不仅仅是为了探索未知,更是为了促进不同文明之间的交流与融合。希望这次经历能激发你们对数据世界更多奇妙的想象和探索!

实际源码

"""
功能:
监测数据表是否更新
连接postgresql
连接mysql
比较-同步
"""
import sys
import pymysql
import psycopg2
import pandas as pd
import numpy as npdbname_mysql = 'followup'
table_name_list_mysql = ['s01_issue_table','np_kickoff','np_rfq','s02_np_kickoff','s02_np_rfq','s06_kick_off_list','s06rfqlist']dbname_postgresql = 'lcmbigdata'
table_name_list_postgresql = ['s01_issue_table','np_kickoff','np_rfq','s02_np_kickoff','s02_np_rfq','s06_kick_off_list','s06rfqlist']col_address_lists = [['create_time', 'update_time'],['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date'],['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date'],['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date']]modelname_postgresql = 'digitalelf'# 数据库连接参数
conn_params_mysql = {"database": dbname_mysql,"user": "root","password": "root","host": "localhost","port": 3306  # 端口号应该是整数
}
conn_params_postgresql = {"database": dbname_postgresql,"user": "postgres",  # 数据库用户"password": "root",  # 数据库密码"host": "localhost",  # 数据库服务器地址"port": 6666  # 数据库端口
}issue_columns = ['id' ,'model_no' ,'part_no' ,'issue_type', 'issue_from', 'start_date','end_date' ,'status', 'priority', 'fab', 'issue_description', 'root_cause','customer' ,'customer_operation' ,'phase', 'analysis', 'solution', 'progress','lesson_learnt' ,'create_by' ,'create_time', 'update_by', 'update_time','sys_org_code' ,'site', 'issue_owner', 'is_nudd' ,'attachment' ,'issue_dept','issue_update' ,'func', 'material_structure']
kickoff_columns = ['id', 'week', 'jiazhi', 'jishu', 'modelno', 'pn', 'customer', 'technology', 'cellsite', 'kickoffdate', 'mpdate', 'dvtdate', 'fcst', 'design_processremark', 'yingyong', 'kickoff_gopremium', 'jingzhengduishou', 'kehu_shiyong_fangshi', 'odm', 'others_feiyong', 'renli_target', 'fy_target', 'mpiowner', 'mpi_bumen', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code']
rfq_columns = ['id', 'model_no', 'customer', 'evaluation_date', 'rfq_result', 'fail_cause', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code']
s02_kickoff_columns = ['id', 'week', 'customer', 'technology', 'odm', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'renli_target', 'fy_target', 'jiazhi', 'jishu', 'pn', 'cellsite', 'kickoffdate', 'mpdate', 'dvtdate', 'fcst', 'design_processremark', 'yingyong', 'kickoff_gopremium', 'jingzhengduishou', 'kehu_shiyong_fangshi', 'others_feiyong', 'mpiowner', 'mpi_bumen', 'modelno']
s02_rfq_columns = ['id', 'customer', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'model_no', 'evaluation_date', 'rfq_result', 'fail_cause']
s06_kickoff_columns = ['id', 'modelno', 'technology', 'cellsite', 'fcst', 'jingzhengduishou', 'odm', 'mpiowner', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'week', 'jiazhi', 'jishu', 'kickoff_gopremium', 'pn', 'customer', 'kickoffdate', 'mpdate', 'dvtdate', 'yingyong', 'kehu_shiyong_fangshi', 'others_feiyong', 'renli_target', 'fy_target', 'design_processremark', 'mpi_bumen']
s06_rfq_columns = ['id', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'model_no', 'customer', 'evaluation_date', 'rfq_result', 'fail_cause']columns_names = [issue_columns,kickoff_columns,rfq_columns,s02_kickoff_columns,s02_rfq_columns,s06_kickoff_columns,s06_rfq_columns]sql_host = 'localhost'
port = 3306
user = 'root'
password = 'root'
sql_db_name = 'followup'
# sql_table_name = 'np_issue'def replace_nan_with_none(value):"""将 numpy.nan 替换为 None,其他值保持不变。"""return None if np.isnan(value) else value# #数据库SQL上传函数
def sql_connect():try:conn = pymysql.connect(host = sql_host,port = port,user = user,password = password,db = sql_db_name,charset='utf8')return connexcept Exception as e:logging.error('SQL CONNECT' + str(e))def sql_upload(raw_data, table_name):"""将DataFrame raw_data写入MySQL数据库指定的table_name表中。"""# 连接数据库conn = sql_connect()cursor = conn.cursor()# 确保表存在,如果不存在则创建表# 注意:这里简化了表的创建过程,实际应用中可能需要根据raw_data的列名和数据类型创建合适的表结构create_table_sql = f"""CREATE TABLE IF NOT EXISTS {table_name} (-- 假设所有列都是VARCHAR类型,实际情况应根据raw_data的列调整flag VARCHAR(255),period VARCHAR(255),dept VARCHAR(255),-- 其他列...bd VARCHAR(255));"""cursor.execute(create_table_sql)# 构建插入语句insert_sql = f"""INSERT INTO {table_name} ({', '.join(raw_data.columns)})VALUES ({', '.join(['%s'] * len(raw_data.columns))});"""# 执行插入语句for index, row in raw_data.iterrows():try:# 处理列表中的每个元素# print("index:\n",index)row_list = list(row)for i in range(len(row_list)):if row_list[i] is np.nan:row_list[i] = replace_nan_with_none(row_list[i])# if np.nan in row:#     cleaned_row = [replace_nan_with_none(x) for x in list(row)]cursor.execute(insert_sql, row_list)conn.commit()except Exception as e:print("上传失败:", e)print(insert_sql,list(row))conn.rollback()cursor.close()conn.close()print("数据成功上传至数据库")def data_update_PostgreSQL_MySQL(data_type_tips):for i in range(0,len(table_name_list_mysql)):# i=1print("🦁"*30)print(f"第{data_type_tips}次更新{table_name_list_mysql[i]}")print("🐎" * 30)table_name_mysql = table_name_list_mysql[i]table_name_postgresql = table_name_list_postgresql[i]col_address_list = col_address_lists[i]data_columns = columns_names[i]id_list_mysql = []id_list_postgresql = []id_difference = []try:# 使用连接参数建立PostgreSQL连接try:conn_postgresql = psycopg2.connect(**conn_params_postgresql)# print("成功连接到PostgreSQL数据库")# 创建一个cursor对象来执行SQL命令cur_postgresql = conn_postgresql.cursor()# 执行SQL查询(例如:选择所有记录)# cur_postgresql.execute(f"SELECT * FROM {dbname}.{your_table_name};")cur_postgresql.execute(f"SELECT id FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql};")# 获取所有查询结果rows = cur_postgresql.fetchall()for row in rows:id_list_postgresql.append(row[0])# print(row)except (Exception, psycopg2.DatabaseError) as error:print(f"PostgreSQL数据库错误:{error}")# 使用连接参数建立MySQL连接try:conn_mysql = pymysql.connect(**conn_params_mysql)# print("成功连接到MySQL数据库")# 创建一个cursor对象来执行SQL命令cur_mysql = conn_mysql .cursor()# 执行SQL查询(例如:选择所有记录)# 修复 SQL 查询字符串中的表名引用#cur.execute(f"SELECT * FROM {your_table_name};")cur_mysql.execute(f"SELECT id FROM {table_name_mysql};")# 获取所有查询结果rows = cur_mysql.fetchall()for row in rows:id_list_mysql.append(row[0])# print(row)for id in id_list_postgresql:if id not in id_list_mysql:# print(id)id_difference.append(id)print(len(id_difference))# print(id_difference)# sys.exit()print("🐒"*20)if len(id_difference):if len(id_difference) == 1:cur_postgresql.execute(f"SELECT * FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql} where id = '{id_difference[0]}';")else:id_difference = tuple(id_difference)cur_postgresql.execute(f"SELECT * FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql} where id in {id_difference};")"""新入新的数据库"""# 获取所有查询结果rows_new = cur_postgresql.fetchall()# print(len(rows_new),len(data_columns))print("🐅" * 20)rows_pd = pd.DataFrame(rows_new, columns=data_columns)print("🐍"*20)# print(len(rows_pd.columns),len(rows_new),len(data_columns))print("🐉" * 20)# 处理 DataFrame 中的日期时间列for col in data_columns:data_type = rows_pd[col].dtypeif 'datetime64[ns, UTC+08:00]' in str(data_type):# 移除时区信息rows_pd[col] = rows_pd[col].dt.tz_localize(None)# 将日期时间转换为字符串格式rows_pd[col] = rows_pd[col].dt.strftime('%Y-%m-%d %H:%M:%S')sql_upload(rows_pd, table_name_mysql)print(f"已同步{len(id_difference)}条记录至{table_name_mysql}")# print('★' * 10, '\n', 'successs')else:print(f"已同步{len(id_difference)}条记录至{table_name_mysql}")except (Exception, pymysql.DatabaseError) as error:print(f"MySQL数据库错误:{error}")finally:# 关闭cursor和连接if cur_postgresql:cur_postgresql.close()if conn_postgresql:conn_postgresql.close()# print("PostgreSQL数据库连接已关闭")# 关闭cursor和连接if cur_mysql:cur_mysql.close()if conn_mysql :conn_mysql .close()# print("MySQL数据库连接已关闭")except Exception as e:print(f"数据库同步ERROR {e}")def data_update():for i in range(1,3):# print(f"第{i}次更新数据库")data_update_PostgreSQL_MySQL(i)data_update()

运行结果

在这里插入图片描述

增加定时功能

import schedule
import timedef data_update():print("Updating data...")# 每15分钟执行一次
schedule.every(15).minutes.do(data_update)while True:schedule.run_pending()time.sleep(1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/51597.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于RK3588+MCU智能清洁车应用解决方案

智能清洁车应用解决方案 在智慧城市建设发展的过程中,智慧环卫是打造智慧城市的重要组成部分,智能清洁车作为实现环卫智能化、提升作业效率和服务质量的关键工具,发挥着不可或缺的作用。 智能清洁车集成了激光雷达、双目视觉、多重传感器以及…

无线通信频率分配

首先看看无线电信号的频谱如何划分: 一、5G NR 3GPP已指定5G NR 支持的频段列表,5G NR频谱范围可达100GHz,指定了两大频率范围: ① Frequency range 1 (FR1):就是我们通常讲的6GHz以下频段 频率…

uniapp uni-popup底部弹框留白 底部颜色修改 滚动穿刺

做底部弹框的时候,可能出现以下场景需要处理。 一、出现底部留白不是白色,需要修改颜色的时候: 1、如果弹框不需要圆角效果,则在uni-popup加上背景色就行,弹框是个直角样式: 2、如果需要圆角效果&#xff0…

CSS3页面布局-三栏-中栏流动布局

三栏-中栏流动布局 用负外边距实现 实现三栏布局且中栏内容区不固定的核心问题就是处理右栏的定位&#xff0c; 并在中栏内容区大小改变时控制右栏与布局的关系。 控制两个外包装容器的外边距&#xff0c;一个包围三栏&#xff0c;一个包围左栏和中栏。 <!DOCTYPE html&…

【vue、Electron】搭建一个Electron vue项目过程、将前端页面打包成exe 桌面应用

文章目录 前言使用 electron-vue 创建项目1. 安装 vue-cli&#xff08;如果未安装&#xff09;2. 使用 electron-vue 模板创建项目3. 安装和配置 electron-builder4. 运行Electron项目5. 打包应用 可能遇到的问题解决Electron vue首次启动巨慢无法加载执行npm run electron:bui…

grid布局实现移动端H5响应式排列正方形格子布局

grid布局实现移动端H5响应式排列正方形区域 grid布局&#xff1a;CSS Grid 网格布局教程在 CSS 中&#xff0c;padding-top 的百分比值是相对于元素自身的宽度&#xff0c;而不是高度。这是 CSS 规范中的一个特性&#xff0c;所有的 padding 和 margin 的百分比值都是相对于元…

客服系统简易版

整体架构解读 客服端和商城端都通过websocket连接到客服系统, 并定期维持心跳当客户接入客服系统时, 先根据策略选择在线客服, 然后再发送消息给客服 websocket实现 用netty实现websocket协议, 增加心跳处理的handler, 详见chat-server模块 客服路由规则 暂时仅支持轮询的…

上新!Matlab实现基于QRGRU-Attention分位数回归门控循环单元注意力机制的时间序列区间预测模型

目录 效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现基于QRGRU-Attention分位数回归门控循环单元注意力机制的时间序列区间预测模型&#xff1b; 2.多图输出、多指标输出(MAE、RMSE、MSE、R2)&#xff0c;多输入单输出&#xff0c;含不同置信区间图、概率…

出现Property ‘sqlSessionFactory‘ or ‘sqlSessionTemplate‘ are requiredProperty报错

目录&#xff1a; bug Property ‘sqlSessionFactory‘ or ‘sqlSessionTemplate‘ are requiredProperty报错解决方法 bug Property ‘sqlSessionFactory‘ or ‘sqlSessionTemplate‘ are requiredProperty 报错 在一个springboot demo启动的时候出现以下错误 &#xff0c;…

中国城市经济韧性数据集(2007-2022年)

数据来源&#xff1a;数据来自历年《中国城市统计NJ》、各省市《统计NJ》及《中国区域经济统计NJ》 时间范围&#xff1a;2007-2022年 数据范围&#xff1a;中国地级市样例数据&#xff1a; 包含内容&#xff1a; 全部内容下载链接&#xff08;原始数据计算代码最终数据&…

大数据基础:离线与实时数仓区别和建设思路

文章目录 离线与实时数仓区别和建设思路 一、离线数仓与实时数仓区别 ​​​​​​​二、实时数仓建设思路 离线与实时数仓区别和建设思路 ​​​​​​​一、离线数仓与实时数仓区别 离线数据与实时数仓区别如下&#xff1a; 对比方面 离线数仓 实时数仓 架构选择 传…

zdppy+vue3+onlyoffice文档管理系统实战 20240829上课笔记 Python验证码框架完成

遗留的问题 还没有测试校验的功能 测试校验验证码的功能 生成验证码 from .tobase64 import get_base64 from .validate import is_captchadef captcha(api, cache, num4, expire60):""":param cache: 缓存对象:param num: 验证码的个数:param expire: 验证…

C++系列-STL容器之vector

STL概念 vector基本概念vector与数组的区别vector容器的特点动态大小连续存储自动扩容尾部操作高效 vector动态扩展的含义vector常用的接口示意 vector的构造函数vector赋值操作重载赋值assign赋值 vector的容量和大小vector的插入和删除vector数据存取vector互换容器vector互换…

Apache RocketMQ 批处理模型演进之路

作者&#xff1a;谷乂 RocketMQ 的目标&#xff0c;是致力于打造一个消息、事件、流一体的超融合处理平台。这意味着它需要满足各个场景下各式各样的要求&#xff0c;而批量处理则是流计算领域对于极致吞吐量要求的经典解法&#xff0c;这当然也意味着 RocketMQ 也有一套属于自…

从开题到答辩:ChatGPT超全提示词分享!(上)【建议收藏】

在浩瀚的知识领域中&#xff0c;提问的艺术是探索真理的钥匙。在这个信息爆炸的时代&#xff0c;深入探索知识的海洋&#xff0c;不仅需要热情和毅力&#xff0c;更需要正确的方法和工具。学术研究是一个复杂而严谨的过程&#xff0c;涉及从选题、文献综述到研究设计、数据收集…

最新高仿拼夕夕源码/拼单系统源码/拼单商城/类目功能齐全

源码简介&#xff1a; 高仿拼夕夕源码&#xff0c;拼单商城系统源码、拼团商城源码&#xff0c;改的版本。拼夕夕拼团商城系统源码源码 多商户多区域拼团系统源码。 自己改的版本&#xff0c;类似于拼单的商城&#xff0c;功能齐全&#xff0c;看着还挺不错&#xff0c;绝对值…

macos 10.15 catalina xcode 下载和安装

在macos 10.15 catalina系统中, 由于系统已经不再支持,所以我们无法通过应用商店来安装xcode, 需要手动下载指定版本的 xcode 版本才能安装, catalina 支持的最新xcode版本为 Xcode v12.4 (12D4e) , 其他的新版本是无法安装在Catalina系统中的. Xcode_12.4.xip下载地址 注意,下…

RocketMQ第5集

一 RocketMQ的工作流程 1.1 生产环节producer Producer可以将消息写入到某Broker中的某Queue中&#xff1a;其中Producer发送消息之前&#xff0c;会先向NameServer发出获取消息Topic的路由信息的请求&#xff0c;NameServer返回该Topic的路由表及Broker列表。简单的说&…

火爆硅谷的“AI原生”代码编辑器Cursor使用体验

简单总结下&#xff1a; 如果现在你有需要改动到代码、有需要迅速了解一个开源项目的代码逻辑、或者对代码一直没有入门想要学习的话&#xff1a; 现在&#xff01;立刻&#xff01;马上&#xff01;下载体验使用cursor&#xff01;It‘s a life-saving decision&#xff01;…

排序算法见解(2)

1.快速排序 1.1基本思想&#xff1a; 快速排序是通过一趟排序将待排序的数据分割成独立的两部分&#xff0c;其中一部分的所有数据都比另一部分的所有数据都要小&#xff0c;然后再按此方法对这两部分数据分别进行快速排序&#xff0c;整个排序过程可以递归进行&#xff0c;以…