Python3操作redis百万级数据迁移,单机到单机,集群到集群

Python3操作redis大量数据迁移 脚本

  • 背景
  • 使用前
  • 使用注意事项
  • 脚本
  • 总结

背景

之前写过一个用python迁移redis的脚本,但是对于小批量的数据、单节点的,还可以用。对于大量数据,集群模式的就不太适合了。所以写了下面的脚本,而且做了一定的优化,使用的pipeline和多线程,保证了迁移数据的速度,本人测试,大概2分钟复制了110万键值对的数据,差不多是每秒一万键值对的复制速度。

使用前

注意:

  1. 用的是Python3环境,Python2的大概需要改一下print输出
  2. 安装相关的模块
pip install redis rediscluster
  1. 可以在windows、linux环境下使用,注意修改里面的一些设置

使用注意事项

下面是一些需要注意的:

  1. 下面的脚本,如果redis数据超过500万键值对,很可能会有瓶颈,因为是一次性取的redis的所有键组成列表,列表很大,大概率会有阻塞。这样就不建议这种迁移方式了
  2. 下面的脚本,是建立在新的redis实例是空数据,或者说没有与原redis实例键值重复的情况下,要不然会重写。
  3. 下面的脚本,是迁移了所有键值对,没有做一些键的匹配,不过改起来不复杂
  4. 下面的脚本,多线程和批量提交的参数,如有必要可以改一下,对比测试。包括一些redis实例的参数配置也一样,如有必要可以改。
  5. 下面的脚本,单机到集群等各种迁移模式,要根据实际情况进行修改。
  6. 这个脚本,没办法实现实时复制,如果原redis数据库一直有增量数据,而且比较大,最好用其他方式迁移。因为脚本读取的redis Key值列表,只是那一个时间的,新库老库数据会有几分钟的差异。如果是把redis当做持久化库而不是缓存库的情况,也不适合。

暂时就这些了。

脚本

内容如下,根据实际情况进行调整

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 2024/4/24from datetime import datetime
import time
import threading
import redis
from rediscluster import RedisClusterdef split_list(big_list, num=1):"""原来是[1,2,3,4,5,6]的列表,拆分成[[1,2], [3,4], [5,6]]小列表,主要是为了多线程:param big_list: 大列表:param num: 拆分多少个列表,这个主要对应后面的线程数,或者说redis的连接数,不能设置的太大,否则会报错Too many connections:return: 新的列表"""list_len = len(big_list)new_list = []if list_len > num:if list_len % num == 0:small_list_len = list_len // numelse:small_list_len = list_len // num + 1start = 0for i in range(num):# print(i)new_list.append(big_list[start: start + small_list_len])start += small_list_lenelse:new_list.append(big_list)return new_listdef redis_get_set(redis_source, redis_target, redis_list,  batch_size=100):"""读取redis“键”列表,获取Key/Value值,写入到新的redis:param redis_source: 原redis实例:param redis_target: 新redis实例:param redis_list: 要迁移的redis Key值列表:param batch_size: 使用pipeline写入新的redis实例,提高写入效率:return:"""count = 0with redis_target.pipeline() as pipe:for k in redis_list:data_type = redis_source.type(k)# 判断key值数据类型,分别处理,没有stream数据类型的处理,后面有必要再添加if data_type == 'string':v = redis_source.get(k)redis_target.set(k, v)elif data_type == 'list':v = redis_source.lrange(k, 0, -1)redis_target.lpush(k, *v)elif data_type == 'set':v = redis_source.smembers(k)redis_target.sadd(k, *v)elif data_type == 'hash':fields = redis_source.hgetall(k)pipe.hset(k, mapping=fields)elif data_type == 'zset':v = redis_source.zrange(k, 0, -1, withscores=True)# 需要将元组数据转化为字典数据redis_target.zadd(k, dict(v))else:print('not known type')count += 1# 如果数据量较大,循环batch_size次数后提交一次if count % batch_size == 0:print(f'\n当前时间:{datetime.now()},进程:{threading.current_thread()},已完成{count}对读写操作')pipe.execute()pipe.reset()# 最后再提交一次pipelinepipe.execute()pipe.reset()print(f'\n当前时间:{datetime.now()},进程:{threading.current_thread()},已完成所有读写操作!')def redis_copy(redis_source, redis_target, thread_num=5, batch_size=100):"""将原始redis的Key值大列表进行拆分,然后拆分后的列表进行多线程处理:param redis_source: 原redis实例:param redis_target: 新redis实例:param thread_num: 线程数,将大列表拆分为几个小列表,这个数不要太大,一般10个就行,不然程序会报错:param batch_size::return:"""# 检查两个redis是否可用try:redis_source.ping()redis_target.ping()print("Redis节点可连接")except Exception as e:print(f"连接Redis失败: {e}")redis_target = None# 线程列表threads = []if redis_target:new_list = split_list(redis_source.keys('*'), thread_num)for data in new_list:t = threading.Thread(target=redis_get_set, args=(redis_source, redis_target, data, batch_size))threads.append(t)t.start()for t in threads:t.join()print("所有线程执行完毕")def single_to_single(thread_num, batch_size):"""单节点迁移到单节点"""# 原始redis,单节点source_pool = redis.ConnectionPool(host='192.168.10.1',port=6379,db=0,password='123456',encoding='utf-8',decode_responses=True,socket_timeout=10,max_connections=100)redis_source = redis.Redis(connection_pool=source_pool)# 目标redis,单节点target_pool = redis.ConnectionPool(host='192.168.10.2',port=6369,db=0,password='123456',encoding='utf-8',decode_responses=True,socket_timeout=10,max_connections=100)redis_target = redis.Redis(connection_pool=target_pool)redis_copy(redis_source, redis_target, thread_num=10, batch_size=10000)def single_to_cluster(thread_num, batch_size):"""单节点迁移到单节点"""# 原始redis,单节点source_pool = redis.ConnectionPool(host='192.168.10.1',port=6379,db=0,password='123456',encoding='utf-8',decode_responses=True,socket_timeout=10,max_connections=100)redis_source = redis.Redis(connection_pool=source_pool)# 目标redis,集群target_node_list = [{"host": "192.168.11.1", "port": "6379"},{"host": "192.168.11.2", "port": "6379"},{"host": "192.168.11.3", "port": "6379"},{"host": "192.168.11.4", "port": "6379"},{"host": "192.168.11.5", "port": "6379"},{"host": "192.168.11.6", "port": "6379"},]# 创建RedisCluster的实例# decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串redis_cluster_target = RedisCluster(startup_nodes=target_node_list,decode_responses=True,password='123456')redis_copy(redis_source, redis_cluster_target, thread_num=10, batch_size=10000)def cluster_to_single(thread_num, batch_size):"""集群迁移到集群"""# 原始redis,集群source_node_list = [{"host": "192.168.0.1", "port": "6379"},{"host": "192.168.0.2", "port": "6379"},{"host": "192.168.0.3", "port": "6379"},{"host": "192.168.0.4", "port": "6379"},{"host": "192.168.0.5", "port": "6379"},{"host": "192.168.0.6", "port": "6379"},]# 创建RedisCluster的实例# decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串redis_cluster_source = RedisCluster(startup_nodes=source_node_list,decode_responses=True,password='123456')# 目标redis,单节点target_pool = redis.ConnectionPool(host='192.168.10.2',port=6369,db=0,password='123456',encoding='utf-8',decode_responses=True,socket_timeout=10,max_connections=100)redis_target = redis.Redis(connection_pool=target_pool)redis_copy(redis_cluster_source, redis_target, thread_num=10, batch_size=10000)def cluster_to_cluster(thread_num, batch_size):"""集群迁移到集群"""# 原始redis,集群source_node_list = [{"host": "192.168.0.1", "port": "6379"},{"host": "192.168.0.2", "port": "6379"},{"host": "192.168.0.3", "port": "6379"},{"host": "192.168.0.4", "port": "6379"},{"host": "192.168.0.5", "port": "6379"},{"host": "192.168.0.6", "port": "6379"},]# 创建RedisCluster的实例# decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串redis_cluster_source = RedisCluster(startup_nodes=source_node_list,decode_responses=True,password='123456')# 目标redis,集群target_node_list = [{"host": "192.168.11.1", "port": "6379"},{"host": "192.168.11.2", "port": "6379"},{"host": "192.168.11.3", "port": "6379"},{"host": "192.168.11.4", "port": "6379"},{"host": "192.168.11.5", "port": "6379"},{"host": "192.168.11.6", "port": "6379"},]# 创建RedisCluster的实例# decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串redis_cluster_target = RedisCluster(startup_nodes=target_node_list,decode_responses=True,password='123456')redis_copy(redis_cluster_source, redis_cluster_target, thread_num=10, batch_size=10000)if __name__ == '__main__':# 性能与效率控制# 线程数thread_num = 10# 写入批量提交数batch_size = 10000start_time = time.perf_counter()# 单节点迁移到单节点single_to_single(thread_num, batch_size)# 单节点迁移到集群# single_to_cluster(thread_num, batch_size)# 集群迁移到单节点# cluster_to_single(thread_num, batch_size)# 集群迁移到集群# cluster_to_cluster(thread_num, batch_size)end_time = time.perf_counter()# 计算执行时间execution_time = end_time - start_timeprint(f"代码执行时间: {execution_time} 秒")

总结

上面的代码,为了优化性能,改了好几次。刚开始的时候,50万键值对数据(5个数据类型各10万左右),迁移复制大概需要300s-400s左右,平均每秒钟大约复制1300-1700的键值对,经过多次优化,平均每秒钟大约复制9000的键值对,提升了6-7倍左右。

优化思路:

  1. 使用pipeline,批量提交键值对到新的库
  2. 在数据量比较大的情况下,原redis的键整体取出后,是一个比较大的列表,先将这个大列表拆分为较为平均的几个小列表,然后使用多线程分别对小列表数据读写,可能大大提高读写效率。但是线程数也没必要设置太高,一个是redis连接数有限制,还有一个是我试了一下,比如从10提升到20,读写速度提升的不是太明显。

其它思考:
1 还能进行哪些优化呢?我看有些商业软件能做到每秒钟10万级别KV的复制,想不出来怎么做的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/4153.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【前端开发基础知识快速入门】

前端开发基础知识&快速入门 一、VSCode 使用1.1 安装常用插件1.2 创建项目1.3 创建网页1.4 运行效果二、ES62.1 简介2.2 什么是 ECMAScript2.3 ES6 新特性2.3.1 let 声明变量2.3.2 const 声明常量(只读变量)2.3.3 解构表达式2.3.4 字符串扩展2.3.5 函数优化2.3.6 对象优化…

Ubuntu中apt更新时报错The certificate issuer is unknown的解决办法

Ubuntu 22.04更新apt出现The certificate issuer is unknown的解决办法 问题描述解决办法讨论 问题描述 使用docker安装Ubuntu22.04,官网给出的镜像只是一个裸系统,预装软件很少。换阿里源以后,apt update,出现如下报错&#xff…

【C语言】动态内存分配(一)

目录 1.为什么要有动态内存分配 2.malloc和free 2.1malloc 2.2free 1.为什么要有动态内存分配 我们已经掌握的内存开辟方式有: 但是上述的开辟空间的方式有两个特点: ⭐空间开辟大小是固定的。 ⭐数组在申明的时候,必须指定数组的长度,数组空间一旦…

GateWay具体的使用之全局token过滤器

1: 创建过滤器类 首先,你需要创建一个实现了GatewayFilter接口或者继承AbstractGatewayFilterFactory类的过滤器类。这里以实现GatewayFilter接口为例,创建一个全局token过滤器。 package com.by.filter;import cn.hutool.core.collection.CollUtil; imp…

SQL提升

1. SQL TOP 子句 TOP 子句用于规定要返回的记录的数目。 对于拥有数千条记录的大型表来说,TOP 子句是非常有用的。 **注释:**并非所有的数据库系统都支持 TOP 子句。 1.1 SQL TOP 语法 SQL Server 的语法: SELECT TOP number|percent c…

OpenCV C++实现区域面积筛选以及统计区域个数

目录 1、背景介绍 2、代码实现 2.1 获取原图 2.1.1 区域图像imread 2.1.2 具体实现 2.2 获取图像大小 2.3 阈值分割 2.3.1 阈值分割threshold 2.3.2 具体实现 2.4 区域面积筛选 2.4.1 获取轮廓findContours 2.4.2 获取轮廓面积contourArea 2.4.3 填充区域fil…

http请求与响应

目录 HTTP请求格式 HTTP响应格式 HTTP请求格式 请求行:请求数据第一行(请求方式,资源路径,协议) 请求头:第二行开始,格式key:value 请求体:POST请求,存放在请求参数 非restful方式&#xff1…

浅谈大数据时代下的电商风控||电商数据API接口

抢抢抢!最后1天,双十一直播活动来啦!抢直播专属优惠…… 视频号 随着大数据时代的兴起,互联网电商风控已经从无风控、人工抽取规则为主的简易规则模型发展到当前基于大数据的风控。与金融风控不同,互联网电商风控呈现出…

详解MyBatis配置文件开发与注解式开发

首先呢,五一快来啦!提前祝各位宝子们五一玩得开心,然后讲解一下MyBatis框架呀!!! 一.框架介绍 MyBatis 的主要特点 二.MyBatis工作流程 1. 初始化和配置 2. 创建 SqlSessionFactory 3. 获取 SqlSession 4. 映射器绑定 5. 执行操作 6.…

力扣1518. 换水问题

题目链接 力扣1518. 换水问题 简单方法(模拟) 思路 对换水进行模拟,每次喝完 n u m E x c h a n g e numExchange numExchange 瓶水后就去换一瓶水,直到不能再兑换为止,也就是剩余水的数量小于 n u m E x c h a n g e numExchange numE…

vscode中jsconfig.json文件首行提示错误

在使用react框架开发前端时,文件jsconfig.json首行提示错误,打开设置,勾选如下图这项

jupyter lab 如何安装和启动

Jupyter Lab 是一种基于 web 的交互式开发环境,用于 Jupyter 笔记本。与传统的 Jupyter 笔记本相比,它提供了更友好、更可扩展的界面,具有代码单元格、markdown 单元格、小部件和文件浏览器等功能。 1.安装 Jupyter Lab: 打开终端或命令提示…

【Go】通道作为函数参数

目录 一、Pings Pongs例子程序 二、使用通道进行任务分配和结果收集 三、使用通道进行错误处理 四、使用通道实现速率限制 五、使用通道进行数据同步 总结 共性 解决的问题类型 实际应用示例 Go 语言中,通道(channel)是实现协程&…

Linux 第十一章

🐶博主主页:ᰔᩚ. 一怀明月ꦿ ❤️‍🔥专栏系列:线性代数,C初学者入门训练,题解C,C的使用文章,「初学」C,linux 🔥座右铭:“不要等到什么都没有了…

Java 图形化框架 - AWT、Swing

文章目录 关于 AWTAWT继承体系 关于 Swing关于 JFC 和 Swing Swing 组件组件层次Swing组件和AWT组件的对应关系:Swing组件按照功能来分类: Java使用AWT和Swing相关的类可以完成图形化界面编程,其中AWT的全称是抽象窗口工具集(Abstract Window…

北大发现了一种特殊类型的注意力头!

检索头的发现或许将有力地帮助大模型领域在提高长上下文推理能力、减少幻觉和压缩KV缓存方面的研究。 从 Claude100K 到 Gemini10M,我们正处于长上下文语言模型的时代。如何在长上下文中利用任何输入位置的信息?北大联合另外四所高校发现了一种特殊类型…

(四)后台-对文章的增删改查操作

(四)后台-对文章的增删改查操作 文章目录 (四)后台-对文章的增删改查操作一、RESTFUL API二、路由处理1、/home 后台首页页面2、/upload 文件上传功能 三、文章模块1、/list 获取文章列表 GetList2、/ 新增|编辑文章 SaveOrUpdate…

AI 提示词生成器

https://prompt-generator.ilovecoke.cc/参考 AI 提示词生成器

docker部署前端项目(三)简易迅速版本

前两个docker 部署都出现了 意外,通过dockerfile 文件操作的时候, 不是 npm 无法下载,就是 npm build 无法打包 总是困难重重,原因甚多,不是网络导致,就是版本不对, 原因可能是 node 版本和 npm…

运维 kubernetes(k8s)基础学习

一、容器相关 1、发展历程:主机–虚拟机–容器 主机类似别墅的概念,一个地基上盖的房子只属于一个人家,很多房子会空出来,资源比较空闲浪费。 虚拟机类似楼房,一个地基上盖的楼房住着很多人家,相对主机模式…