【python爬虫】设计自己的爬虫 2. 数据保存封装 mongodb,mysql和elasticsearch

mongodb, mysql和elasticsearch 功能较相似,所以打算用一套接口将它们封装起来

基类StorageBase

如下:

class StorageBase:def __init__(self, host=None, port=None, database=None, table=None, location=None, account=None, password=None,url=None):self.host = hostself.port = portself.database = databaseself.table = tableself.location = locationself.account = accountself.password = passwordself.url = urldef build_connect(self):raise NotImplementedError# 增def add(self, table_collection_index, data, id=None):raise NotImplementedError# 删def delete(self, table_collection_index, condition=None, data=None, id=None):raise NotImplementedError# 改def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):raise NotImplementedError# 查def search(self, table_collection_index, condition=None):raise NotImplementedError# 没有就新增,有就更新def add_or_update(self, table_collection_index, data, condition=None):raise NotImplementedError

封装mongodb

class MongodbStore(StorageBase):def __init__(self, host=MONGODB_CONNECTION_HOST, port=MONGODB_CONNECTION_PORT, database=MONGODB_DATABASE,account=MONGODB_DATABASE_USER, password=MONGODB_DATABASE_PASS):self.client = Noneself.db = NoneStorageBase.__init__(self, host=host, port=port,database=database, account=account, password=password)def build_connect(self):try:connection = f"mongodb://{self.account}:{self.password}@{self.host}:{self.port}/{self.database}"self.client = pymongo.MongoClient(connection)self.db = self.client[self.database]except Exception as e:print("失败:{0}".format(e))self.db = Noneelse:print(f"连接成功")return self.db# 增# data要是列表形式,否则报错# 添加单个可以是 单个元素的数组def add(self, table_collection_index, data, id=None):try:result = self.db[table_collection_index].insert_many(data)print(f"新增成功")except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"新增成功")return resultdef add_one(self, table_collection_index, data):try:result = self.db[table_collection_index].insert_one(data)except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"新增成功")return result# 没有就新增,有就更新def add_or_update(self, table_collection_index, data, condition=None):existing_doc = self.search_one(table_collection_index, conditions=None)if existing_doc:# 如果文档存在,则更新self.update_one(table_collection_index, condition, {"key": "set","value": data})print('数据已更新。')else:# 如果文档不存在,则插入新文档self.add_one(table_collection_index, data)print('新数据已插入。')# 删def delete(self, table_collection_index, condition=None, data=None, id=None):try:result = self.db[table_collection_index].delete_many(parse_mongodb_condition(data))except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"删除成功")return resultdef delete_one(self, table_collection_index, data):try:result = self.db[table_collection_index].delete_one(parse_mongodb_condition(data))except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"删除成功")return result# 改# search_condition 指定查询条件# update_condition 指定更新条件def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):try:result = self.db[table_collection_index].update_many(parse_mongodb_condition(condition),parse_mongodb_condition(update_condition))except Exception as e:logging.error(f"失败:{e}")result = Noneelse:print(f"匹配的数据{result.matched_count if result else 0}条,影响的数据{result.modified_count if result else 0}条")return result# 改单个def update_one(self, table_collection_index=None, search_condition=None, update_condition=None):try:result = self.db[table_collection_index].update_one(parse_mongodb_condition(search_condition),parse_mongodb_condition(update_condition))except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"匹配的数据{result.matched_count if result else 0}条,影响的数据{result.modified_count if result else 0}条")return result# 计数def count(self, table_collection_index, conditions):try:result = self.db[table_collection_index].count_documents(parse_mongodb_condition(conditions))except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"共有{result.matched_count if result else 0}条数据")return result# 查def search(self, table_collection_index, conditions=None):try:results = self.db[table_collection_index].find(parse_mongodb_condition(conditions) if conditions else {})except Exception as e:print("失败:{0}".format(e))results = Nonereturn resultsdef search_one(self, table_collection_index, conditions=None):try:result = self.db[table_collection_index].find_one(parse_mongodb_condition(conditions) if conditions else {})print(f"查询到的内容:{result}")except Exception as e:print("失败:{0}".format(e))result = Nonereturn result# 排序def sort(self, table_collection_index, sort_key, conditions=None, skip_num=None, limit_num=None,sort_type=pymongo.ASCENDING):try:query = self.db[table_collection_index].find(parse_mongodb_condition(conditions) if conditions else {}).sort(sort_key,sort_type)if skip_num and limit_num:results = query.skip(skip_num).limit(limit_num)elif skip_num:results = query.skip(skip_num)elif limit_num:results = query.limit(limit_num)else:results = queryexcept Exception as e:print("失败:{0}".format(e))result = Nonereturn results# 测试代码
if __name__ == "__main__":# 建立连接mongodb_connect = MongodbStore()mongodb_connect.build_connect()# student1 = {#     "id": 1,#     "name": "Jordan3",#     "age": 23,#     "gender": "male"# }## student2 = {#     "name": "Mike",#     "age": 21,#     "gender": "male"# }# mongodb_connect.add("students", [student1])# result = mongodb_connect.delete("students", {"name": "Mike"})# result = mongodb_connect.update("students", {"age": {"#gt": 23}}, {"$inc": {"age": 1}})# print(result.matched_count, result.modified_count)# 自定义分隔符号# condition = parse_mongodb_condition("age*>*20;name*regex*'^J.*'", ";", "*")condition_list = [{"key": "age","condition": ">","value": "20"},{"key": "name","condition": "regex","value": "'^J.*'"},]# 不设置condition的话表示等于# search_list = [#     {#         "key": "age",#         "value": "20"#     },# ]# 条件表达式解析mongodb_connect.search("students", condition_list)mongodb_connect.sort("students", 'age', condition_list, None, None, pymongo.DESCENDING)

这里要注意的是考虑到mongodb搜索条件的语法有些复杂,因此对其进行了一些简化,简化的思路是把条件转化成简单的键值对对象,下面看代码

# 优化该方法的思路
# 1. 使用列表推导式替代了外部循环。
# 2. 使用 str.join() 方法替代了内部循环中的字符串连接操作,这样可以减少不必要的字符串创建和连接操作,提高代码效率。
# 3. 使用 dict.values() 方法直接获取字典中的所有值,避免了在内部循环中通过键获取值的操作。
# 将条件对象数组转化为条件字符串
def turn_list_to_condition_str(list, split_cond=',', split_key=':'):conditions_list = [split_key.join([str(value) for value in search.values()])for search in list]return split_cond.join(conditions_list)# 该方法将条件字符串转换为条件对象
def parse_mongodb_condition(conditions, split_cond=',', split_key=':'):# 传入字符串规则# 例子:"age:>:20,name:regex:'^J.*'"# ,分隔多个条件# age:>:20  age是字段 >是条件 20是值 中间用:分隔# 分隔符默认是,和: 也可以自定义# 分割字符串pairs = turn_list_to_condition_str(conditions).split(split_cond)# 构建字典condition_dict = {}# 避免了在每次循环中都多次调用 pair.split(split_key)for pair in pairs:split_pair = pair.split(split_key)key = replace_chars_by_dit(split_pair[0], MONGODB_CONDITION_DICTIONARY)if len(split_pair) == 3:cond = replace_chars_by_dit(split_pair[1], MONGODB_CONDITION_DICTIONARY)condition_dict[key] = {cond: eval(split_pair[2])}elif len(split_pair) == 2:condition_dict[key] = eval(split_pair[1])# print(f'解析后的条件表达式是{condition_dict}')return condition_dict# 测试代码
if __name__ == "__main__":# condition_str = "age:20,name:regex:'^J.*'"# print(parse_mongodb_condition(condition_str))search_list = [{"key": "age","condition": ">","value": "20"},{"key": "name","condition": "regex",# 注意字符串的值要帶单引号''"value": "'^J.*'"},]search_list = {"key": "set","value": "20"},print(parse_mongodb_condition(search_list))

封装 Elasticsearch

# 可以和关系数据库的概念对比理解
# Relational DB -> Databases -> Tables -> Rows -> Columns
# Elasticsearch -> Indices -> Types -> Documents -> Fields
class ElasticsearchStore(StorageBase):def __init__(self, host=ELASTICSEARCH_HOST, port=ELASTICSEARCH_PORT,account=ELASTICSEARCH_USERNAME, password=ELASTICSEARCH_PASSWORD,verify_certs=ELASTICSEARCH_VERIFY_CERTS):self.es = Noneself.verify_certs = verify_certsStorageBase.__init__(self, host=host, port=port, account=account, password=password)def build_connect(self, default=True):try:if default:self.es = Elasticsearch()else:url = f"https://[{self.account}:{self.password}@]{self.host}:{self.port}"self.es = Elasticsearch(url, verify_certs=self.verify_certs)except Exception as e:print("失败:{0}".format(e))self.es = Noneelse:print(f"连接成功")return self.es# 增def add(self, table_collection_index, data, id=None):try:result = self.es.index(index=table_collection_index, body=data, id=id)print(f"新增成功")except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"新增成功")return result# 删def delete(self, table_collection_index, condition=None, data=None, id=None):try:result = self.es.delete(index=table_collection_index, id=id, ignore=[400, 404])print(f"删除成功")except Exception as e:print("失败:{0}".format(e))result = Noneelse:print(f"删除成功")return result# 改def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):try:result = self.es.update(index=table_collection_index, body=data, id=id, ignore=[400, 404])except Exception as e:logging.error(f"失败:{e}")result = Noneelse:print(f"更新成功")return result# 获取def get(self, table_collection_index, id):try:result = self.es.get(index=table_collection_index, id=id, ignore=[400, 404])except Exception as e:logging.error(f"失败:{e}")result = Noneelse:print(f"获取成功{result}")return result# 查# dsl = {#     'query': {#         'match': {#             'title': '高考 圆梦'#         }#     }# }def search(self, table_collection_index, condition=None):try:results = self.es.search(index=table_collection_index, body=condition)except Exception as e:logging.error(f"失败:{e}")results = Noneelse:print(f"搜索成功{results}")return results# 测试代码
if __name__ == "__main__":# 建立连接elasticsearch_store = ElasticsearchStore()elasticsearch_store.build_connect()# doc = {#     'author': 'kimchy',#     'text': 'Elasticsearch: cool. bonsai cool.',#     'timestamp': datetime.now(),# }# elasticsearch_connect.add('test-index', doc, 1)# elasticsearch_connect.get('test-index',1)doc1 = {'author': 'kimchy','text': 'test','timestamp': datetime.now(),}elasticsearch_store.update('test-index', doc1, 1)# elasticsearch_store.delete('test-index', 1)elasticsearch_store.search('test-index', {"query": {"match_all": {}}})

封装Mysql

class MysqlStore(StorageBase):def __init__(self, host=MYSQL_HOST, port=MYSQL_PORT,account=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DATABASE):self.db = Noneself.cursor = NoneStorageBase.__init__(self, host=host, port=port, account=account, password=password, database=database)def build_connect(self) -> pymysql.cursors.Cursor:"""建立数据库连接。Returns:pymysql.cursors.Cursor: 数据库游标对象,用于执行 SQL 语句。"""try:self.db = pymysql.connect(host=self.host, user=self.account, password=self.password, port=self.port,database=self.database)self.cursor = self.db.cursor()except Exception as e:print(f"连接失败:{e}")self.db = Noneself.cursor = Noneelse:print("连接成功")return self.cursor# 增# table 表# data 新增数据 键值对对象def add(self, table_collection_index, data, id=None):# data.keys()返回的是键的数组keys = ', '.join(data.keys())# 下面这段是构造多个%s最为占位符,有几个字段就构造几个values = ', '.join(['%s'] * len(data))# 要执行的sql语句sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table_collection_index, keys=keys, values=values)try:self.cursor.execute(sql, tuple(data.values()))self.db.commit()print(f"新增成功")except Exception as e:print("失败:{0}".format(e))self.db.rollback()finally:print(f"{sql}")# 没有新增,有就更新# table 表# data 新增数据 键值对对象def add_or_update(self, table_collection_index, data, condition=None):# data.keys()返回的是键的数组keys = ', '.join(data.keys())# 下面这段是构造多个%s最为占位符,有几个字段就构造几个values = ', '.join(['%s'] * len(data))# 要执行的sql语句# 如果主键已经存在,就执行更新操作sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE '.format(table=table_collection_index, keys=keys,values=values)update = ', '.join(["{key} = %s".format(key=key) for key in data])sql += updatetry:self.cursor.execute(sql, tuple(data.values()) * 2)self.db.commit()print(f"新增成功")except Exception as e:print("失败:{0}".format(e))self.db.rollback()finally:print(f"{sql}")# 删# table 表名# condition 删除条件def delete(self, table_collection_index, condition=None, data=None, id=None):sql = 'DELETE FROM  {table} WHERE {condition}'.format(table=table, condition=condition)try:self.cursor.execute(sql)self.db.commit()print(f"删除成功")except Exception as e:print("失败:{0}".format(e))self.db.rollback()finally:print(f"{sql}")# 改# table 表名# data 数据# condition 条件def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None):# data.keys()返回的是键的数组keys = ', '.join(data.keys())# 下面这段是构造多个%s最为占位符,有几个字段就构造几个values = ', '.join(['%s'] * len(data))# 要跟新的数据update = ','.join(["{key} = %s".format(key=key) for key in data])# 要执行的sql语句sql = 'UPDATE {table} SET {update} WHERE {condition}'.format(table=table_collection_index, update=update, condition=condition)try:self.cursor.execute(sql, tuple(data.values()))self.db.commit()print(f"更新成功")except Exception as e:print("失败:{0}".format(e))self.db.rollback()finally:print(f"{sql}")# 查def search(self, table_collection_index, condition=None):# 要执行的sql语句sql = 'SELECT * FROM {table} WHERE {condition}'.format(table=table_collection_index, condition=condition)try:self.cursor.execute(sql)# fetchall获取结果的所有数据results = self.cursor.fetchall()# fetchall得到的是二重元祖,其中每个元素都是一条记录# 要注意的是fetch的内部实现中有一个偏移指针,用来指向查询结果,偏移指针最开始指向第一条数据,取了一次数据后,指针偏移到下一条数据,# fetchone被调用后,结果的偏移指针会指向下一条数据,fetchall方法返回的是从偏移指针指向的数据一直到结束的所有数据,所有fetchall获取的数据会少一条for row in results:print(row)except Exception as e:print("失败:{0}".format(e))# 测试代码
if __name__ == "__main__":# 建立连接mysql_connect = MysqlStore()mysql_connect.build_connect()data = {"name": "Bob","age": 22}table = "students"mysql_connect.search(table, 'age>=20')

最后要注意的是
数据库连接的相关信息都放在统一的文件里并且设置为默认的值,也可以初始化的时候传入相应的值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/195779.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安卓1.0明显是基于linux内核开发的,安卓1.0是不是linux套壳?

安卓1.0明显是基于linux内核开发的,安卓1.0是不是linux套壳? 在开始前我有一些资料,是我根据自己从业十年经验,熬夜搞了几个通宵,精心整理了一份「安卓开发资料从专业入门到高级教程工具包」,点个关注&…

141. 环形链表

141. 环形链表 设置一个fast指针,一个slow指针,fast一次走两步,slow一次走一步。如果fast和slow相遇,则说明有环。反之没相遇则无环。 注意快慢指针的while循环条件是fast.next ! null && fast.next.next ! null /*** …

大数据集群增加数据盘,平衡数据盘HDFS Disk Balancer

大数据集群增加数据盘,平衡数据盘HDFS Disk Balancer 官网:https://hadoop.apache.org/docs/r3.3.6/hadoop-project-dist/hadoop-hdfs/HDFSDiskbalancer.html hdfs diskbalancer -execute /system/diskbalancer/nodename.plan.jsonhdfs diskbalancer -q…

StringJoiner使用详解

关于StringJoiner 1.介绍2.源码2.1 属性2.2 方法 3 举例StringJoiner做法3.1 只含间隔符3.2 含间隔符和前后缀3.3 merge合并两个joiner3.4 stringJoiner.setEmptyValue("xxx");3.5 综合举例length()方法 1.介绍 在JDK1.8之后,提供了一个StringJoiner类用来…

简谈oracle数据库的归档模式

一、oracle数据库归档模式简介 Oracle数据库归档模式是一种数据备份和恢复策略,它允许数据库记录所有数据库的更改操作(包括已提交和未提交的事务)并将其存储在归档日志中。这些归档日志可以用于在数据库发生故障时进行恢复,并提供点时间恢复(PITR)的能力。 在Oracle数…

ruoyi-vue 整合netty实现TCP/IP协议数据接收

支持持续接收数、可发送数据、可多端口连接。 废话少说,直接上代码! 如果写的可以,记得点个赞~ import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; imp…

PHP 刷新缓存区的问题!

PHP流式输出,在Nginx下可以正常刷新缓存区 , 但是在Apache下会等待循环全部执行完,才会刷新!有怎么解决? header(X-Accel-Buffering: no); // Nginx情况下必须加这一行header(Content-type: text/event-stream);header…

【刷题】【力扣牛客】反转链表的五种方式——Java

文章目录 前言方法一:构造新链表(构造新节点)方法二:构造新链表(不构造新节点)方法三:递归方法四:双指针方法五:遍历总结 力扣题目链接:206. 反转链表 牛客题…

IDEA2023找不到 Allow parallel run

我的idea版本:2023.1.4 第一步:点击Edit Configrations 第二步:点击Modify options 第三步:勾选Allow multiple instances 最后点击Apply应用一下 ok,问题解决!

SSM项目实战-登录验证成功并路由到首页面,Vue3+Vite+Axios+Element-Plus技术

1、util/request.js import axios from "axios";let request axios.create({baseURL: "http://localhost:8080",timeout: 50000 });export default request 2、api/sysUser.js import request from "../util/request.js";export const login (…

Mysql日志

文章目录 1. 日志类型2. bin log2.1 写入机制2.2 binlog与redolog对比2.3 两阶段提交 3. 中继日志 1. 日志类型 这 6 类日志分别为: 慢查询日志: 记录所有执行时间超过long_query_time的所有查询,方便我们对查询进行优化。 通用查询日志&am…

在sCrypt网站上铭刻Ordinals

sCrypt发布了一个新的Ordinals铭刻工具,连接Panda Wallet后即可使用。你可以观看我们录制的视频教程,获得更多细节。 铭刻工具同时支持BSV主网(mainnet)和测试网(testnet),你可以在我们的官方网…

手写VUE后台管理系统8 - 配置404NotFound路由

设置404页面 配置路由404页面 配置路由 这里配置了两个路由,一个是主页,另外一个则匹配任意路由显示为404页面。因为只配置了两个路由,如果路径没有匹配到主页,则会被自动导向到404页面,这样就可以实现整站统一的404页…

「Linux」使用C语言制作简易Shell

💻文章目录 📄前言简易shell实现shell的概念系统环境变量shell的结构定义内建命令完整代码 📓总结 📄前言 对于很多学习后端的同学来讲,学习了C语言,发现除了能写出那个经典的“hello world”以外&#xff…

142873-41-4脂质过氧化抑制剂1-星戈瑞

142873-41-4脂质过氧化抑制剂1 英文名称:Lipid peroxidation inhibitor 1 中文名称:脂质过氧化抑制剂 化学名称:2,4,6,7-四甲基-2-[(4-苯基哌啶-1-基)甲基]-3H-1-苯并呋喃-5-胺 CAS:142873-41-4 外观:固体粉末 分…

D2822ML 用于便携式录音机和收音机作音频功率放大器。采用 DIP8 SOP8 封装形式

D2822ML 用于便携式录音机和收音机作音频功率放大器。采用 DIP8 SOP8 封装形式 特点: 电源电压降到 1.8V 时仍能正常工作交越失真小 静态电流小可作桥式或立体声式功放应用外围元件少通道分离度高 开机和关机无冲击噪声软限幅

RT-Thread 内存管理

在计算机系统中,通常存储空间可以分为两种:内部存储空间和外部存储空间。 内部存储空间通常访问速度比较快,能够按照变量地址随机访问,也就是我们通常所说的RAM(随机存储器),可以把它理解为电脑…

docker安装达梦数据库并挂在数据卷

离线包下载地址:请点击 1.在线下载 wget https://download.dameng.com/eco/dm8/dm8_20230808_rev197096_x86_rh6_64_single.tar2. 导入镜像 docker load -i dm8_20230808_rev197096_x86_rh6_64_single.tar3. 运行容器 docker run -d -p 5236:5236 --restartalwa…

微信公众号端在线客服系统源码 聊天记录云端实时保存 附带完整的搭建教程

随着社交媒体的普及,越来越多的用户通过微信公众号与企业进行沟通。因此,开发一款基于微信公众号的在线客服系统,可以帮助企业更好地服务用户,提高客户满意度。同时,为了解决聊天记录的存储和管理问题,我们…

【Python 训练营】N_17 冒泡排序

题目 列表L [3,2,5,6,1,3,8,1,9],冒泡排序实现从小到大排列。 分析 冒泡排序的基本思想是从序列的第一个元素开始,依次比较相邻的两个元素,如果它们的顺序错误就交换它们的位置,直到整个序列有序为止。具体步骤如下&#xff1…