github:https://github.com/PyMySQL/PyMySQL
Python3 MySQL 数据库连接 - PyMySQL 驱动:https://www.runoob.com/python3/python3-mysql.html
pymysql 是线程安全的( 搜索 thread,可以看到 thread_safe=1,同时函数 thread_safe() 返回 True ):https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/__init__.py
Mysql 如果数据存在则更新,不存在则插入
:https://blog.csdn.net/zhou16333/article/details/95867827
1、PyMySQL 安装
在使用 PyMySQL 之前,我们需要确保 PyMySQL 已安装。
PyMySQL 下载地址:https://github.com/PyMySQL/PyMySQL
安装 PyMySQL 的 Python 包:pip3 install PyMySQL
2、数据库连接
连接数据库前,请先确认以下事项:
- 已经创建了数据库 TESTDB.
- 在 TESTDB 数据库中您已经创建了表 EMPLOYEE
- EMPLOYEE 表字段为 FIRST_NAME, LAST_NAME, AGE, SEX 和 INCOME。
- 连接数据库 TESTDB 使用的用户名为 "testuser" ,密码为 "test123",你可以可以自己设定或者直接使用 root 用户名及其密码,Mysql 数据库用户授权请使用 Grant 命令。
- 已经安装了 Python MySQLdb 模块。
- 如果您对sql语句不熟悉,可以访问 SQL基础教程
示 例:
链接 Mysql 的 TESTDB 数据库:
import pymysql# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()# 使用 execute() 方法执行 SQL 查询
cursor.execute("SELECT VERSION()")# 使用 fetchone() 方法获取单条数据.
data = cursor.fetchone()print ("Database version : %s " % data)# 关闭数据库连接
db.close()
3、使用
创建数据库表
如果数据库连接存在我们可以使用execute()方法来为数据库创建表,如下所示创建表EMPLOYEE:
import pymysql# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()# 使用 execute() 方法执行 SQL,如果表存在则删除
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")# 使用预处理语句创建表
sql = """CREATE TABLE EMPLOYEE (FIRST_NAME CHAR(20) NOT NULL,LAST_NAME CHAR(20),AGE INT, SEX CHAR(1),INCOME FLOAT )"""cursor.execute(sql)# 关闭数据库连接
db.close()
查询 数据
Python 查询 Mysql 使用 fetchone() 方法获取单条数据,使用 fetchall() 方法获取多条数据。
- fetchone(): 该方法获取下一个查询结果集。结果集是一个对象
- fetchall(): 接收全部的返回结果行.
- rowcount: 这是一个只读属性,并返回执行execute()方法后影响的行数。
查询 EMPLOYEE 表中 salary(工资)字段大于 1000 的所有数据:
import pymysql# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法获取操作游标
cursor = db.cursor()# SQL 查询语句
sql = "SELECT * FROM EMPLOYEE \WHERE INCOME > %s" % (1000)
try:# 执行SQL语句cursor.execute(sql)# 获取所有记录列表results = cursor.fetchall()for row in results:fname = row[0]lname = row[1]age = row[2]sex = row[3]income = row[4]# 打印结果print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \(fname, lname, age, sex, income ))
except:print ("Error: unable to fetch data")# 关闭数据库连接
db.close()
示例:
import pymysqlclass DB():def __init__(self, host='localhost', port=3306, db='', user='root', passwd='root', charset='utf8'):# 建立连接 self.conn = pymysql.connect(host=host, port=port, db=db, user=user, passwd=passwd, charset=charset)# 创建游标,操作设置为字典类型 self.cur = self.conn.cursor(cursor = pymysql.cursors.DictCursor)def __enter__(self):# 返回游标 return self.curdef __exit__(self, exc_type, exc_val, exc_tb):# 提交数据库并执行 self.conn.commit()# 关闭游标 self.cur.close()# 关闭数据库连接 self.conn.close()if __name__ == '__main__':with DB(host='192.168.68.129',user='root',passwd='zhumoran',db='text3') as db:db.execute('select * from course')print(db)for i in db:print(i)
插入 数据
执行 SQL INSERT 语句向表 EMPLOYEE 插入记录:
import pymysql# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法获取操作游标
cursor = db.cursor()# SQL 插入语句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,LAST_NAME, AGE, SEX, INCOME)VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:# 执行sql语句cursor.execute(sql)# 提交到数据库执行db.commit()
except:# 如果发生错误则回滚db.rollback()# 关闭数据库连接
db.close()
以上例子也可以写成如下形式:
import pymysql# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法获取操作游标
cursor = db.cursor()# SQL 插入语句
sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \LAST_NAME, AGE, SEX, INCOME) \VALUES ('%s', '%s', %s, '%s', %s)" % \('Mac', 'Mohan', 20, 'M', 2000)
try:# 执行sql语句cursor.execute(sql)# 执行sql语句db.commit()
except:# 发生错误时回滚db.rollback()# 关闭数据库连接
db.close()
以下代码使用变量向SQL语句中传递参数:
..................................
user_id = "test123"
password = "password"con.execute('insert into Login values( %s, %s)' % \(user_id, password))
..................................
单条插入数据:
#!/usr/bin/python3import pymysql# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法获取操作游标
cursor = db.cursor()# SQL 插入语句 里面的数据类型要对应
sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \LAST_NAME, AGE, SEX, INCOME) \VALUES ('%s', '%s', %s, '%s', %s)" % \('Mac', 'Mohan', 20, 'M', 2000)
try:# 执行sql语句cursor.execute(sql)# 执行sql语句db.commit()
except:# 发生错误时回滚db.rollback()# 关闭数据库连接
db.close()
批量插入数据:
注意:批量插入数据 与 单条插入数据 的区别:
- 批量插入:VALUES (%s, %s, %s, %s, %s,) 里面 不用引号
- 单条插入:VALUES ('%s', '%s', '%s', '%s', '%s') 里面 需要引号
#!/usr/bin/env python
# -*-encoding:utf-8-*-import pymysql# 打开数据库连接
db = pymysql.connect("localhost","root","123","testdb")# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()# SQL 插入语句
sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \LAST_NAME, AGE, SEX, INCOME) \VALUES (%s,%s,%s,%s,%s)"
# 区别与单条插入数据,VALUES ('%s', '%s', %s, '%s', %s) 里面不用引号val = (('li', 'si', 16, 'F', 1000),('Bruse', 'Jerry', 30, 'F', 3000),('Lee', 'Tomcat', 40, 'M', 4000),('zhang', 'san', 18, 'M', 1500))
try:# 执行sql语句cursor.executemany(sql,val)# 提交到数据库执行db.commit()
except:# 如果发生错误则回滚db.rollback()# 关闭数据库连接
db.close()
更新 数据
更新操作用于更新数据表的数据,以下实例将 TESTDB 表中 SEX 为 'M' 的 AGE 字段递增 1:
import pymysql# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法获取操作游标
cursor = db.cursor()# SQL 更新语句
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
try:# 执行SQL语句cursor.execute(sql)# 提交到数据库执行db.commit()
except:# 发生错误时回滚db.rollback()# 关闭数据库连接
db.close()
批量 更新
使用 pymysql 的 course.executemany(sql, update_list) 进行批量更新
- sql:更新一条的 sql 语句模板;
- update_list:一个列表套元组的结构;
示 例:
db = pymysql.connect(user='root', password='mysql', database='test', host='127.0.0.1', port=3306, charset='utf8mb4')name_list = ["re", "gh", "ds", "D"] # 存储name的值
age_list = ["10", "20", "30", "40"] # 存储age的值
id_list = ["1", "2", "3", "4"] # 存储id的值
val_list = [[name_list[i], age_list[i], id_list[i]] for i in range(len(id_list))]
print(val_list)
# [['re', '10', '1'], ['gh', '20', '2'], ['ds', '30', '3'], ['D', '40', '4']]with db.cursor() as cursor:try:sql = "UPDATE test SET name=(%s), age=(%s) WHERE id=(%s)"cursor.executemany(sql, val_list)db.commit()except:db.rollback()
db.close()
pymysql 批量 --- 增、删、改、查
注意:插入数字也是 %s
# coding=utf-8import time
import pymysql.cursorsconn= pymysql.connect(host='rm-xxx.mysql.rds.aliyuncs.com',port=3306,user='dba',password='xxxxx',db='app',charset='utf8')
cursor= conn.cursor()
# conn.ping(reconnect=True)count= 0
posts=[]
for postin posts:try:sql= 'DELETE FROM user_like WHERE user_id=%s and like_post_id=%s'ret= cursor.executemany(sql, ((1,2), (3,4), (5,6)))conn.commit()except Exception as e:print("batch Exception:", e)count+=1cursor.close()
conn.close()# 基本sql语句写法
# INSERT INTO star(name,gender) VALUES(“XX”, 20)
# SELECT * FROM app.user_post WHERE post_id LIKE '%xxxx%';
# UPDATE app.user_post SET post_id=replace(post_id,'\'','’);
# UPDATE app.user_post SET province = ‘xxx', city =‘xxx';
# DELETE FROM app.user_post where updated_at = '0000-00-00 00:00:00’;# 带参数构造语句的基本写法
# sql = 'select user_id, post_id from user_post where user_id="{user_id}" and post_id="{post_id}"'.format(user_id=user_id, post_id=post_id)
# sql = 'SELECT count(*) FROM user_like where like_post_id = "%s"' % ("xxx")
# sql = 'update star set gender="{gender}", height="{height}" where star_id="{star_id}"'.format(gender='M', height=180, star_id=123456789)
删除 数据
删除操作用于删除数据表中的数据,以下实例演示了删除数据表 EMPLOYEE 中 AGE 大于 20 的所有数据:
import pymysql# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法获取操作游标
cursor = db.cursor()# SQL 删除语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
try:# 执行SQL语句cursor.execute(sql)# 提交修改db.commit()
except:# 发生错误时回滚db.rollback()# 关闭连接
db.close()
执行 事务
事务机制可以确保数据一致性。
对于支持事务的数据库, 在 Python 数据库编程中,当游标建立之时,就自动开始了一个隐形的数据库事务。commit()方法游标的所有更新操作,rollback()方法回滚当前游标的所有操作。每一个方法都开始了一个新的事务。
事务应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为ACID特性。
- 原子性(atomicity)。一个事务是一个不可分割的工作单位,事务中包括的诸操作要么都做,要么都不做。
- 一致性(consistency)。事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。
- 隔离性(isolation)。一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。
- 持久性(durability)。持续性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响。
示例:
# SQL删除记录语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
try:# 执行SQL语句cursor.execute(sql)# 向数据库提交db.commit()
except:# 发生错误时回滚db.rollback()
pymysqlpool
线程安全 pymysqlpool
# -*-coding: utf-8-*-
# Author : Christopher Lee
# License: Apache License
# File : test_example.py
# Date : 2017-06-18 01-23
# Version: 0.0.1
# Description: simple test.import logging
import string
import threadingimport pandas as pd
import randomfrom pymysqlpool import ConnectionPoolconfig = {'pool_name': 'test','host': 'localhost','port': 3306,'user': 'root','password': 'chris','database': 'test','pool_resize_boundary': 50,'enable_auto_resize': True,# 'max_pool_size': 10
}logging.basicConfig(format='[%(asctime)s][%(name)s][%(module)s.%(lineno)d][%(levelname)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S',level=logging.DEBUG)def connection_pool():# Return a connection pool instancepool = ConnectionPool(**config)# pool.connect()return pooldef test_pool_cursor(cursor_obj=None):cursor_obj = cursor_obj or connection_pool().cursor()with cursor_obj as cursor:print('Truncate table user')cursor.execute('TRUNCATE user')print('Insert one record')result = cursor.execute('INSERT INTO user (name, age) VALUES (%s, %s)', ('Jerry', 20))print(result, cursor.lastrowid)print('Insert multiple records')users = [(name, age) for name in ['Jacky', 'Mary', 'Micheal'] for age in range(10, 15)]result = cursor.executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)print(result)print('View items in table user')cursor.execute('SELECT * FROM user')for user in cursor:print(user)print('Update the name of one user in the table')cursor.execute('UPDATE user SET name="Chris", age=29 WHERE id = 16')cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 1')print(cursor.fetchone())print('Delete the last record')cursor.execute('DELETE FROM user WHERE id = 16')def test_pool_connection():with connection_pool().connection(autocommit=True) as conn:test_pool_cursor(conn.cursor())def test_with_pandas():with connection_pool().connection() as conn:df = pd.read_sql('SELECT * FROM user', conn)print(df)def delete_users():with connection_pool().cursor() as cursor:cursor.execute('TRUNCATE user')def add_users(users, conn):def execute(c):c.cursor().executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)c.commit()if conn:execute(conn)returnwith connection_pool().connection() as conn:execute(conn)def add_user(user, conn=None):def execute(c):c.cursor().execute('INSERT INTO user (name, age) VALUES (%s, %s)', user)c.commit()if conn:execute(conn)returnwith connection_pool().connection() as conn:execute(conn)def list_users():with connection_pool().cursor() as cursor:cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 5')print('...')for x in sorted(cursor, key=lambda d: d['id']):print(x)def random_user():name = "".join(random.sample(string.ascii_lowercase, random.randint(4, 10))).capitalize()age = random.randint(10, 40)return name, agedef worker(id_, batch_size=1, explicit_conn=True):print('[{}] Worker started...'.format(id_))def do(conn=None):for _ in range(batch_size):add_user(random_user(), conn)if not explicit_conn:do()returnwith connection_pool().connection() as c:do(c)print('[{}] Worker finished...'.format(id_))def bulk_worker(id_, batch_size=1, explicit_conn=True):print('[{}] Bulk worker started...'.format(id_))def do(conn=None):add_users([random_user() for _ in range(batch_size)], conn)time.sleep(3)if not explicit_conn:do()returnwith connection_pool().connection() as c:do(c)print('[{}] Worker finished...'.format(id_))def test_with_single_thread(batch_number, batch_size, explicit_conn=False, bulk_insert=False):delete_users()wk = worker if not bulk_insert else bulk_workerfor i in range(batch_number):wk(i, batch_size, explicit_conn)list_users()def test_with_multi_threads(batch_number=1, batch_size=1000, explicit_conn=False, bulk_insert=False):delete_users()wk = worker if not bulk_insert else bulk_workerthreads = []for i in range(batch_number):t = threading.Thread(target=wk, args=(i, batch_size, explicit_conn))threads.append(t)t.start()[t.join() for t in threads]list_users()if __name__ == '__main__':import timestart = time.perf_counter()test_pool_cursor()test_pool_connection()test_with_pandas()test_with_multi_threads(20, 10, True, bulk_insert=True)test_with_single_thread(1, 10, True, bulk_insert=True)elapsed = time.perf_counter() - startprint('Elapsed time is: "{}"'.format(elapsed))