简单介绍pymysql的一些操作,增改删查
增
先建表,再写数据至表中
除查询操作外,增改删都需要commit操作,具体原理看ref.1
import pandas as pd
import pymysql
import time
import warnings
warnings.filterwarnings("ignore")
建表
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
create_sql = """
create table user(id int NOT NULL AUTO_INCREMENT,`name` varchar(50) NOT NULL,`age` int NOT NULL,PRIMARY KEY (`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
"""
try:# 执行sql语句cur.execute(create_sql)# 执行sql语句con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
desc user;
"""
try:# 执行sql语句cur.execute(sql)get_df = pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句con.commit()
except:# 发生错误时回滚con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
Field Type Null Key Default Extra
0 id int NO PRI None auto_increment
1 name varchar(50) NO None
2 age int NO None
插入数据
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
row_nums = 500000
sql = "insert into user(name, age)values('小明', 14)"
try:# 执行sql语句t1 = time.time()for i in range(row_nums):cur.execute(sql)con.commit() # 提交t2 = time.time()print(f"循环写入耗时:{t2 - t1}") # 7s
except:# 发生错误时回滚con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
循环写入耗时:39.632535457611084
批量写入
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
row_nums = 500000
sql = "insert into user(name, age) values(%s,%s)"
citys = [('小明', 14) for i in range(row_nums)
]try:# 执行sql语句t1 = time.time()# citys是参数组合,每个元素对应一行insert sql的对应字段,可以是元组,也可以是列表cur.executemany(sql, citys) # 批量执行con.commit() # 提交t2 = time.time()print(f"批量写入耗时:{t2 - t1}") # 7s
except:# 发生错误时回滚con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
批量写入耗时:5.722973823547363
批量写入有明显的速度优势,注意"insert into user(name, age) values(%s,%s)",values前面有空格,具体原因看ref.2
pyspark批量写入
数据量巨大时,可以结合spark的foreachPartition算子,并行写入
import pandas as pd
import time
import pymysql
import functools
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
def get_or_create_hudi(app_name):spark = SparkSession \.builder \.appName(app_name) \.config("spark.driver.maxResultSize", "10g") \.config("spark.sql.execution.arrow.enabled", "true") \.config("spark.dynamicAllocation.enabled", "false") \.config("spark.sql.crossJoin.enabled", "true") \.config("spark.kryoserializer.buffer.max", "512m") \.config("spark.io.compression.codec", "snappy") \.config("spark.sql.hive.convertMetastoreParquet", "false") \.config("spark.hadoop.dfs.namenode.acls.enabled", "false") \.config("spark.sql.hive.convertMetastoreParquet", "false") \.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \.enableHiveSupport() \.getOrCreate()spark.sparkContext.setLogLevel('ERROR')print("\n")print("\n")return spark
def insert2mysql_partrdd(part, db_param="", value_cols=['name', 'age'], batch=40000):"""@param part:@param db_param: mysql配置信息@param value_cols: insert 列名称@param batch: 批插入数据量@return:"""con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")cur = con.cursor(cursor=pymysql.cursors.DictCursor)cnt = 0batch_list = []sql = sql = "insert into user(name, age) values(%s,%s)"for row in part:# 这个操作可能会比较耗时。。有没有好方法优化下?batch_list.append([row[i] for i in value_cols])cnt = cnt + 1if cnt > 0 and cnt % batch == 0:cur.executemany(sql, batch_list)con.commit() # 提交batch_list = []print(f"第{cnt - batch}-{cnt}行数据插入MySql!")# 最后一波数据如果不是batch余数,也推过去if cnt % batch != 0:cur.executemany(sql, batch_list)con.commit() # 提交print(f"第{cnt - cnt % batch}-{cnt}行数据插入MySql!")if cnt > 0:print(f"数据抽样-key:{row}")print(f"cnt:{cnt}")else:print("该分区无数据")cur.close()con.close()
row_nums = 500000df = pd.DataFrame({"name": ['小明'] * row_nums, 'age': [14] * row_nums})
spark = get_or_create_hudi("test")
spark_df = spark.createDataFrame(df).repartition(10)
t1 = time.time()
spark_df.rdd.foreachPartition(functools.partial(insert2mysql_partrdd, batch=50000))
t2 = time.time()
print(f"spark批写入耗时:{t2 - t1}") # 1.2s
spark批写入耗时:8.034992456436157
- 速度上似乎没有更快
- 可能数据量再大些,会有效果
- 另,单机跑spark也可能有些影响
删
刚才搞了100w数据,删除些
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
delete from user where id>10
"""
try:# 执行sql语句cur.execute(sql)# 执行sql语句con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select count(*) as cnt from user
"""
try:# 执行sql语句cur.execute(sql)get_df = pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句# con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
cnt
0 10
还剩10条数据
查
结合pandas,把查询的数据转成df
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select * from user limit 100
"""
try:# 执行sql语句cur.execute(sql)get_df = pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句# con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
id name age
0 1 小明 14
1 2 小明 14
2 3 小明 14
3 4 小明 14
4 5 小明 14
5 6 小明 14
6 7 小明 14
7 8 小明 14
8 9 小明 14
9 10 小明 14
改
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
update user set name = '小红' where id<=5
"""
try:# 执行sql语句cur.execute(sql)# 执行sql语句con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select * from user limit 100
"""
try:# 执行sql语句cur.execute(sql)get_df = pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句# con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
id name age
0 1 小红 14
1 2 小红 14
2 3 小红 14
3 4 小红 14
4 5 小红 14
5 6 小明 14
6 7 小明 14
7 8 小明 14
8 9 小明 14
9 10 小明 14
Ref
[1] https://www.runoob.com/python3/python3-mysql.html
[2] https://www.modb.pro/db/184700
2023-07-28 台风、大雨 于南京市江宁区