[Python小工具]Python批量生成数据到MySQL
base.py
#!/usr/bin/python
# -*- coding:utf-8 -*-
import time
import random
from datetime import datetime, timedelta
from faker import Faker
import stringfake = Faker('zh_CN')class Base_conn:# 数据库连接配置初始化def __init__(self, DB_HOST, DB_USER, DB_PASSWORD, DB_NAME):self.DB_HOST = DB_HOSTself.DB_USER = DB_USERself.DB_PASSWORD = DB_PASSWORDself.DB_NAME = DB_NAMEclass TimeShift:def starttime(self):self.start_time = time.time()return self.start_timedef endtime(self):self.end_time = time.time()return self.end_timedef executiontime(self):self.execution_time = self.end_time - self.start_timereturn self.execution_timedef timeshift(self, execution_time):if execution_time < 60:return f"{int(execution_time)}秒"elif execution_time >= 60 or execution_time <= 3600:return f"{execution_time / 60:.1f}分钟"else:return f"{int(execution_time / 60 / 60):.1f}小时"class Table_Employee:# 生成身份证信息的模拟函数,可根据实际情况调整def generate_id_card(self):# region_code = "420100" # 地区编码region_code = "{:.6f}".format(random.random()).split('.')[1] # 地区编码birth_date = datetime.strptime("19900101", "%Y%m%d") + timedelta(days=random.randint(0, 365 * 50)) # 出生日期seq_code = str(random.randint(100, 999)) # 序列码gender_code = str(random.randint(0, 9)) # 性别码check_code = str(random.randint(0, 9)) # 校验码(这里仅用于示例)id_card = f"{region_code}{birth_date.strftime('%Y%m%d')}{seq_code}{gender_code}{check_code}"# 确保生成的身份证号码长度正好是 18 个字符return id_card[:18]def __init__(self, work_id=None, name=None, age=None, gender=None, id_card=None, entry_date=None, department=None):# self.work_id = work_id if work_id else fake.pystr(min_chars=3, max_chars=10)self.work_id = work_id if work_id else "{:.6f}".format(random.random()).split('.')[1]self.name = name if name else fake.name()self.age = age if age else random.randint(18, 65)self.gender = gender if gender else fake.random_element(elements=('男', '女'))self.id_card = id_card if id_card else self.generate_id_card()self.entry_date = entry_date if entry_date else (datetime.now() - timedelta(days=random.randint(0, 365 * 10))).date()self.department = department if department else random.randint(1, 7)class Table_Students:def Email_data(self):num_letters = random.randint(5, 10)random_letters = random.choices(string.ascii_letters, k=num_letters)Email = "www." + ''.join(random_letters) + '.com'return Emaildef Address_data(self):provinces = ['北京市', '天津市', '山西省']cities = {'北京市': ['东城区', '西城区', '朝阳区', '海淀区', '丰台区'],'天津市': ['和平区', '河东区', '河西区', '南开区', '红桥区'],'山西省': ['太原市', '大同市', '阳泉市', '长治市', '晋城市']}roads = ['长安街', '和平路', '人民大街', '建国路', '中山路', '解放街', '青年路', '光明街', '文化路', '新华街']province = random.choice(provinces)city = random.choice(cities[province])random_province = random.choice(provinces)random_road = random.choice(roads)random_address = random_province + random_roadrandom_init = str(random.randint(100, 999))address = province + city + random_address + random_init + '号'return addressdef __init__(self, SNo=None, Sname=None, Gender=None, Birthday=None, Mobile=None, Email=None, Address=None,Image='null'):# self.work_id = work_id if work_id else fake.pystr(min_chars=3, max_chars=10)self.SNo = SNo if SNo else str((datetime.now() - timedelta(days=random.randint(0, 365 * 10))).date()).split('-')[0] + str(random.randint(100,999))self.Sname = Sname if Sname else fake.name()self.Gender = Gender if Gender else fake.random_element(elements=('男', '女'))self.Birthday = Birthday if Birthday else (datetime.now() - timedelta(days=random.randint(0, 365 * 10))).date()self.Mobile = Mobile if Mobile else str(random.randint(130, 199)) + "{:.8f}".format(random.random()).split('.')[1]self.Email = Email if Email else self.Email_data()self.Address = Address if Address else self.Address_data()self.Image = Image
mysql_data.py
import re
import time
import pymysql
import threading
from faker import Faker
from pymysql.constants import CLIENT
from base import Base_conn, TimeShift, Table_Employee, Table_Studentsfake = Faker('zh_CN')
# 数据库连接配置
# connect = Base_conn('localhost', 'root', 'iotplatform', 'test')
connect = Base_conn('localhost', 'root', 'iotplatform', 'student_profile_db')def connect_db():try:connection = pymysql.connect(host=connect.DB_HOST,user=connect.DB_USER,password=connect.DB_PASSWORD,database=connect.DB_NAME,client_flag=CLIENT.MULTI_STATEMENTS,cursorclass=pymysql.cursors.DictCursor,charset='utf8mb4')return connectionexcept pymysql.MySQLError as e:print(f"Database connection failed: {e}")return None# 数据插入函数
def insert_data(start, end, thread_id, sql):# 创建数据库连接connection = connect_db()try:# 创建 cursor 用于执行 SQL 语句with connection.cursor() as cursor:sql_cmd = sqlfor i in range(start, end):# 记录最后一次用于插入的数据table_employee = Table_Employee()table_students = Table_Students()# last_values = (table_employee.work_id,# table_employee.name,# table_employee.age,# table_employee.gender,# table_employee.id_card,# table_employee.entry_date,# table_employee.department)last_values = (table_students.SNo,table_students.Sname,table_students.Gender,table_students.Birthday,table_students.Mobile,table_students.Email,table_students.Address,table_students.Image)# 执行 SQL 语句cursor.execute(sql_cmd, last_values)connection.commit()# 提交事务# connection.commit()# print(f"Thread {thread_id}: Inserted rows {start} to {end}")except Exception as e:print(f"Thread {thread_id}: Error occurred: {e}")finally:# if last_values:# print(f"Thread {thread_id}: Inserted rows {start} to {end}. Last row data: {last_values}")# 关闭数据库连接connection.close()def perform_sql_operation(sql, operation):conn = connect_db()num = 0try:with conn.cursor() as cursor:if operation in ('select', 'show'):cursor.execute(sql)for row in cursor.fetchall():print(row)elif operation in ('update', 'delete'):cursor.execute(sql)num += 1conn.commit()print('执行sql数:', num)except pymysql.MySQLError as e:print(f"Error: {e}")conn.rollback()finally:conn.close()def insert_sql(sql_command, total_records, num_threads):total_records = total_records # 总共需要插入的记录数num_threads = num_threads # 线程数量records_per_thread = total_records // num_threads # 每个线程需要插入的记录数sql_cmd = sql_commandthreads = []for i in range(num_threads):start_index = i * records_per_threadend_index = start_index + records_per_threadend_index = min(end_index, total_records)thread_id = i + 1# 创建线程thread = threading.Thread(target=insert_data, args=(start_index, end_index, thread_id, sql_cmd))threads.append(thread)thread.start()# 等待所有线程完成for thread in threads:thread.join()def up_del_sel_sql(sql_cmd):sql_command = sql_cmdperform_sql_operation(sql_command, command)# 主函数,负责创建线程并分配任务
def main(sql):if sql.lower() == 'insert':insert_sql(sql_cmd, total_records, num_threads)elif sql.lower() in ('update', 'delete', 'select', 'show'):up_del_sel_sql(sql_cmd)else:print('请传入执行的sql类型: [insert|update|delete|select]')if __name__ == '__main__':# 总共需要插入的记录数total_records = 1000000# 开启线程数量num_threads = 1# 执行sql# sql_cmd = """ insert into `employee` (`workid`, `name`, `age`, `gender`, `idcard`, `entrydate`, `department`)# values (%s, %s, %s, %s, %s, %s, %s) """sql_cmd = """ insert into `Student` (`SNo`, `Sname`, `Gender`, `Birthday`, `Mobile`, `Email`, `Address` , `Image`)values (%s, %s, %s, %s, %s, %s, %s,%s) """# sql_cmd = "delete from employee limit 100000"# sql_cmd = "select count(*) from employee"# sql_cmd = "show tables like '%employee%'"# sql_cmd = ""command = re.search(r'\binsert|update|delete|select|show\b', sql_cmd, re.IGNORECASE)command = command.group().lower() if command else Noneif command:time_shift = TimeShift()start_time = time_shift.starttime()main(command)end_time = time_shift.endtime()execution_time = time_shift.timeshift(time_shift.executiontime())print(f"执行时间: {execution_time}")else:print('未找到匹配的命令')