mongodb, mysql和elasticsearch 功能较相似,所以打算用一套接口将它们封装起来
基类StorageBase
如下:
class StorageBase:def __init__(self, host=None, port=None, database=None, table=None, location=None, account=None, password=None,url=None):self.host = hostself.port = portself.database = databaseself.table = tableself.location = locationself.account = accountself.password = passwordself.url = urldef build_connect(self):raise NotImplementedError# 增def add(self, table_collection_index, data, id=None):raise NotImplementedError# 删def delete(self, table_collection_index, condition=None, data=None, id=None):raise NotImplementedError# 改def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):raise NotImplementedError# 查def search(self, table_collection_index, condition=None):raise NotImplementedError# 没有就新增,有就更新def add_or_update(self, table_collection_index, data, condition=None):raise NotImplementedError
封装mongodb
class MongodbStore(StorageBase):def __init__(self, host=MONGODB_CONNECTION_HOST, port=MONGODB_CONNECTION_PORT, database=MONGODB_DATABASE,account=MONGODB_DATABASE_USER, password=MONGODB_DATABASE_PASS):self.client = Noneself.db = NoneStorageBase.__init__(self, host=host, port=port,database=database, account=account, password=password)def build_connect(self):try:connection = f"mongodb://{self.account}:{self.password}@{self.host}:{self.port}/{self.database}"self.client = pymongo.MongoClient(connection)self.db = self.client[self.database]except Exception as e:print("失败:{0}".format(e))self.db = Noneelse:print(f"连接成功")return self.db# 增# data要是列表形式,否则报错# 添加单个可以是 单个元素的数组def add(self, table_collection_index, data, id=None):try:result = self.db[table_collection_index].insert_many(data)print(f"新增成功")except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"新增成功")return resultdef add_one(self, table_collection_index, data):try:result = self.db[table_collection_index].insert_one(data)except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"新增成功")return result# 没有就新增,有就更新def add_or_update(self, table_collection_index, data, condition=None):existing_doc = self.search_one(table_collection_index, conditions=None)if existing_doc:# 如果文档存在,则更新self.update_one(table_collection_index, condition, {"key": "set","value": data})print('数据已更新。')else:# 如果文档不存在,则插入新文档self.add_one(table_collection_index, data)print('新数据已插入。')# 删def delete(self, table_collection_index, condition=None, data=None, id=None):try:result = self.db[table_collection_index].delete_many(parse_mongodb_condition(data))except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"删除成功")return resultdef delete_one(self, table_collection_index, data):try:result = self.db[table_collection_index].delete_one(parse_mongodb_condition(data))except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"删除成功")return result# 改# search_condition 指定查询条件# update_condition 指定更新条件def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):try:result = self.db[table_collection_index].update_many(parse_mongodb_condition(condition),parse_mongodb_condition(update_condition))except Exception as e:logging.error(f"失败:{e}")result = Noneelse:print(f"匹配的数据{result.matched_count if result else 0}条,影响的数据{result.modified_count if result else 0}条")return result# 改单个def update_one(self, table_collection_index=None, search_condition=None, update_condition=None):try:result = self.db[table_collection_index].update_one(parse_mongodb_condition(search_condition),parse_mongodb_condition(update_condition))except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"匹配的数据{result.matched_count if result else 0}条,影响的数据{result.modified_count if result else 0}条")return result# 计数def count(self, table_collection_index, conditions):try:result = self.db[table_collection_index].count_documents(parse_mongodb_condition(conditions))except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"共有{result.matched_count if result else 0}条数据")return result# 查def search(self, table_collection_index, conditions=None):try:results = self.db[table_collection_index].find(parse_mongodb_condition(conditions) if conditions else {})except Exception as e:print("失败:{0}".format(e))results = Nonereturn resultsdef search_one(self, table_collection_index, conditions=None):try:result = self.db[table_collection_index].find_one(parse_mongodb_condition(conditions) if conditions else {})print(f"查询到的内容:{result}")except Exception as e:print("失败:{0}".format(e))result = Nonereturn result# 排序def sort(self, table_collection_index, sort_key, conditions=None, skip_num=None, limit_num=None,sort_type=pymongo.ASCENDING):try:query = self.db[table_collection_index].find(parse_mongodb_condition(conditions) if conditions else {}).sort(sort_key,sort_type)if skip_num and limit_num:results = query.skip(skip_num).limit(limit_num)elif skip_num:results = query.skip(skip_num)elif limit_num:results = query.limit(limit_num)else:results = queryexcept Exception as e:print("失败:{0}".format(e))result = Nonereturn results# 测试代码
if __name__ == "__main__":# 建立连接mongodb_connect = MongodbStore()mongodb_connect.build_connect()# student1 = {# "id": 1,# "name": "Jordan3",# "age": 23,# "gender": "male"# }## student2 = {# "name": "Mike",# "age": 21,# "gender": "male"# }# mongodb_connect.add("students", [student1])# result = mongodb_connect.delete("students", {"name": "Mike"})# result = mongodb_connect.update("students", {"age": {"#gt": 23}}, {"$inc": {"age": 1}})# print(result.matched_count, result.modified_count)# 自定义分隔符号# condition = parse_mongodb_condition("age*>*20;name*regex*'^J.*'", ";", "*")condition_list = [{"key": "age","condition": ">","value": "20"},{"key": "name","condition": "regex","value": "'^J.*'"},]# 不设置condition的话表示等于# search_list = [# {# "key": "age",# "value": "20"# },# ]# 条件表达式解析mongodb_connect.search("students", condition_list)mongodb_connect.sort("students", 'age', condition_list, None, None, pymongo.DESCENDING)
这里要注意的是考虑到mongodb搜索条件的语法有些复杂,因此对其进行了一些简化,简化的思路是把条件转化成简单的键值对对象,下面看代码
# 优化该方法的思路
# 1. 使用列表推导式替代了外部循环。
# 2. 使用 str.join() 方法替代了内部循环中的字符串连接操作,这样可以减少不必要的字符串创建和连接操作,提高代码效率。
# 3. 使用 dict.values() 方法直接获取字典中的所有值,避免了在内部循环中通过键获取值的操作。
# 将条件对象数组转化为条件字符串
def turn_list_to_condition_str(list, split_cond=',', split_key=':'):conditions_list = [split_key.join([str(value) for value in search.values()])for search in list]return split_cond.join(conditions_list)# 该方法将条件字符串转换为条件对象
def parse_mongodb_condition(conditions, split_cond=',', split_key=':'):# 传入字符串规则# 例子:"age:>:20,name:regex:'^J.*'"# ,分隔多个条件# age:>:20 age是字段 >是条件 20是值 中间用:分隔# 分隔符默认是,和: 也可以自定义# 分割字符串pairs = turn_list_to_condition_str(conditions).split(split_cond)# 构建字典condition_dict = {}# 避免了在每次循环中都多次调用 pair.split(split_key)for pair in pairs:split_pair = pair.split(split_key)key = replace_chars_by_dit(split_pair[0], MONGODB_CONDITION_DICTIONARY)if len(split_pair) == 3:cond = replace_chars_by_dit(split_pair[1], MONGODB_CONDITION_DICTIONARY)condition_dict[key] = {cond: eval(split_pair[2])}elif len(split_pair) == 2:condition_dict[key] = eval(split_pair[1])# print(f'解析后的条件表达式是{condition_dict}')return condition_dict# 测试代码
if __name__ == "__main__":# condition_str = "age:20,name:regex:'^J.*'"# print(parse_mongodb_condition(condition_str))search_list = [{"key": "age","condition": ">","value": "20"},{"key": "name","condition": "regex",# 注意字符串的值要帶单引号''"value": "'^J.*'"},]search_list = {"key": "set","value": "20"},print(parse_mongodb_condition(search_list))
封装 Elasticsearch
# 可以和关系数据库的概念对比理解
# Relational DB -> Databases -> Tables -> Rows -> Columns
# Elasticsearch -> Indices -> Types -> Documents -> Fields
class ElasticsearchStore(StorageBase):def __init__(self, host=ELASTICSEARCH_HOST, port=ELASTICSEARCH_PORT,account=ELASTICSEARCH_USERNAME, password=ELASTICSEARCH_PASSWORD,verify_certs=ELASTICSEARCH_VERIFY_CERTS):self.es = Noneself.verify_certs = verify_certsStorageBase.__init__(self, host=host, port=port, account=account, password=password)def build_connect(self, default=True):try:if default:self.es = Elasticsearch()else:url = f"https://[{self.account}:{self.password}@]{self.host}:{self.port}"self.es = Elasticsearch(url, verify_certs=self.verify_certs)except Exception as e:print("失败:{0}".format(e))self.es = Noneelse:print(f"连接成功")return self.es# 增def add(self, table_collection_index, data, id=None):try:result = self.es.index(index=table_collection_index, body=data, id=id)print(f"新增成功")except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"新增成功")return result# 删def delete(self, table_collection_index, condition=None, data=None, id=None):try:result = self.es.delete(index=table_collection_index, id=id, ignore=[400, 404])print(f"删除成功")except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"删除成功")return result# 改def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):try:result = self.es.update(index=table_collection_index, body=data, id=id, ignore=[400, 404])except Exception as e:logging.error(f"失败:{e}")result = Noneelse:print(f"更新成功")return result# 获取def get(self, table_collection_index, id):try:result = self.es.get(index=table_collection_index, id=id, ignore=[400, 404])except Exception as e:logging.error(f"失败:{e}")result = Noneelse:print(f"获取成功{result}")return result# 查# dsl = {# 'query': {# 'match': {# 'title': '高考 圆梦'# }# }# }def search(self, table_collection_index, condition=None):try:results = self.es.search(index=table_collection_index, body=condition)except Exception as e:logging.error(f"失败:{e}")results = Noneelse:print(f"搜索成功{results}")return results# 测试代码
if __name__ == "__main__":# 建立连接elasticsearch_store = ElasticsearchStore()elasticsearch_store.build_connect()# doc = {# 'author': 'kimchy',# 'text': 'Elasticsearch: cool. bonsai cool.',# 'timestamp': datetime.now(),# }# elasticsearch_connect.add('test-index', doc, 1)# elasticsearch_connect.get('test-index',1)doc1 = {'author': 'kimchy','text': 'test','timestamp': datetime.now(),}elasticsearch_store.update('test-index', doc1, 1)# elasticsearch_store.delete('test-index', 1)elasticsearch_store.search('test-index', {"query": {"match_all": {}}})
封装Mysql
class MysqlStore(StorageBase):def __init__(self, host=MYSQL_HOST, port=MYSQL_PORT,account=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DATABASE):self.db = Noneself.cursor = NoneStorageBase.__init__(self, host=host, port=port, account=account, password=password, database=database)def build_connect(self) -> pymysql.cursors.Cursor:"""建立数据库连接。Returns:pymysql.cursors.Cursor: 数据库游标对象,用于执行 SQL 语句。"""try:self.db = pymysql.connect(host=self.host, user=self.account, password=self.password, port=self.port,database=self.database)self.cursor = self.db.cursor()except Exception as e:print(f"连接失败:{e}")self.db = Noneself.cursor = Noneelse:print("连接成功")return self.cursor# 增# table 表# data 新增数据 键值对对象def add(self, table_collection_index, data, id=None):# data.keys()返回的是键的数组keys = ', '.join(data.keys())# 下面这段是构造多个%s最为占位符,有几个字段就构造几个values = ', '.join(['%s'] * len(data))# 要执行的sql语句sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table_collection_index, keys=keys, values=values)try:self.cursor.execute(sql, tuple(data.values()))self.db.commit()print(f"新增成功")except Exception as e:print("失败:{0}".format(e))self.db.rollback()finally:print(f"{sql}")# 没有新增,有就更新# table 表# data 新增数据 键值对对象def add_or_update(self, table_collection_index, data, condition=None):# data.keys()返回的是键的数组keys = ', '.join(data.keys())# 下面这段是构造多个%s最为占位符,有几个字段就构造几个values = ', '.join(['%s'] * len(data))# 要执行的sql语句# 如果主键已经存在,就执行更新操作sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE '.format(table=table_collection_index, keys=keys,values=values)update = ', '.join(["{key} = %s".format(key=key) for key in data])sql += updatetry:self.cursor.execute(sql, tuple(data.values()) * 2)self.db.commit()print(f"新增成功")except Exception as e:print("失败:{0}".format(e))self.db.rollback()finally:print(f"{sql}")# 删# table 表名# condition 删除条件def delete(self, table_collection_index, condition=None, data=None, id=None):sql = 'DELETE FROM {table} WHERE {condition}'.format(table=table, condition=condition)try:self.cursor.execute(sql)self.db.commit()print(f"删除成功")except Exception as e:print("失败:{0}".format(e))self.db.rollback()finally:print(f"{sql}")# 改# table 表名# data 数据# condition 条件def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):# data.keys()返回的是键的数组keys = ', '.join(data.keys())# 下面这段是构造多个%s最为占位符,有几个字段就构造几个values = ', '.join(['%s'] * len(data))# 要跟新的数据update = ','.join(["{key} = %s".format(key=key) for key in data])# 要执行的sql语句sql = 'UPDATE {table} SET {update} WHERE {condition}'.format(table=table_collection_index, update=update, condition=condition)try:self.cursor.execute(sql, tuple(data.values()))self.db.commit()print(f"更新成功")except Exception as e:print("失败:{0}".format(e))self.db.rollback()finally:print(f"{sql}")# 查def search(self, table_collection_index, condition=None):# 要执行的sql语句sql = 'SELECT * FROM {table} WHERE {condition}'.format(table=table_collection_index, condition=condition)try:self.cursor.execute(sql)# fetchall获取结果的所有数据results = self.cursor.fetchall()# fetchall得到的是二重元祖,其中每个元素都是一条记录# 要注意的是fetch的内部实现中有一个偏移指针,用来指向查询结果,偏移指针最开始指向第一条数据,取了一次数据后,指针偏移到下一条数据,# fetchone被调用后,结果的偏移指针会指向下一条数据,fetchall方法返回的是从偏移指针指向的数据一直到结束的所有数据,所有fetchall获取的数据会少一条for row in results:print(row)except Exception as e:print("失败:{0}".format(e))# 测试代码
if __name__ == "__main__":# 建立连接mysql_connect = MysqlStore()mysql_connect.build_connect()data = {"name": "Bob","age": 22}table = "students"mysql_connect.search(table, 'age>=20')
最后要注意的是
数据库连接的相关信息都放在统一的文件里并且设置为默认的值,也可以初始化的时候传入相应的值