python操作mongodb底层封装并验证pymongo是否应该关闭连接

一、pymongo简介

github地址:https://github.com/mongodb/mongo-python-driver

mymongo安装命令:pip install pymongo==4.7.2

mymongo接口文档:PyMongo 4.7.2 Documentation

PyMongo发行版包含Python与MongoDB数据库交互的工具。bson包是用于Python的bson格式的实现。pymongo包是MongoDB的原生Python驱动程序。gridfs包是pymongo之上的一个gridfs实现。pymongo是python与mongodb交互的首选方式。

二、封装接口

import uuid
from datetime import datetime, timezone
from pydantic import BaseModel,ConfigDict
from pycommon.utils.mongo.mongo_collection_pool import mongo_collection_pool
from pymongo.cursor import Cursor
from pymongo.collection import Collectionclass BaseDbService:def __init__(self, mongo_settings: MongoSettings, encryptor_key: str) -> None:self.mongo_settings = mongo_settingsself.encryptor_key = encryptor_keydef create(self, data: T, user_id: str, tenant_id: str = None, collection_name: str = None) -> str:collection: Collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)if not get_attr_value(data, 'id'):set_attr_value(data, 'id', f"{uuid.uuid1()}")set_attr_value(data, 'created_by', user_id)set_attr_value(data, 'created_time', datetime.now(timezone.utc))request = data.model_dump(by_alias = True)id = collection.insert_one(request).inserted_idlogger.log_info(f'{type(data).__name__} id {id} is created')return iddef create_list(self, datas: list[T], user_id: str, tenant_id: str = None, collection_name: str = None) -> list[str]:collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)request_list = []for data in datas:if not get_attr_value(data, 'id'):set_attr_value(data, 'id', f"{uuid.uuid1()}")set_attr_value(data, 'created_by', user_id)set_attr_value(data, 'created_time', datetime.now(timezone.utc))request = data.model_dump(by_alias = True)request_list.append(request)ids = collection.insert_many(request_list).inserted_idslogger.log_info(f'{type(data).__name__} ids {ids} are created')return idsdef update(self, data: T, user_id: str, tenant_id: str = None, collection_name: str = None):if not get_attr_value(data, 'id'):raise ValueError(f'{type(data).__name__} id is null or empty when update, user_id={user_id}, tenant_id={tenant_id}, collection_name={collection_name}')set_attr_value(data, 'updated_by', user_id)set_attr_value(data, 'updated_time', datetime.now(timezone.utc))collection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)# response is dict typeoriginal_data = collection.find_one(data.id)if original_data:if "createdBy" in original_data:set_attr_value(data, "created_by", original_data["createdBy"])if "createdTime" in original_data:set_attr_value(data, "created_time", original_data["createdTime"])request = data.model_dump(by_alias = True)collection.find_one_and_replace({"_id": data.id}, request)logger.log_info(f'{type(data).__name__} id {id} is updated')else:logger.log_info(f'{type(data).__name__} id {id} does not exist when update')def get(self, id: str, responseType: TResponse, tenant_id: str = None, collection_name: str = None) -> TResponse:if not id:logger.log_info(f'{type(responseType).__name__} id is null or empty when get')return Nonecollection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)result = collection.find_one(id)return responseType.model_validate(result)def get_list(self, ids: list[str], entityType: T, tenant_id: str = None, collection_name: str = None) -> list[T]:if not ids:logger.log_info(f'{type(entityType).__name__} ids is null or empty when get_list')return Nonecollection = mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)response = list(collection.find({"_id": { '$in': ids }}))if not response:return []result = []for item in response:result_item = entityType.model_validate(item)result.append(result_item)return result

整体思路是创建BaseDbService实例时指定目标数据库,进行具体的CRUD操作时再指定表名以及查询结果要转换成的类型(pymongo查询出结果后默认是dict类型),这样底层的数据库服务就能操作任意库的所有表了,而且最终获取的数据是具体类型的对象。

实际上用户大致会按照下述方式使用BaseDbService:

if __name__ == '__main__':mongo_settings = MongoSettings(connection_string="mongodb://localhost:27017/", database_name="test-activity")db_service = BaseDbService(mongo_settings, "bWRHdzkzVDFJbWNB")collection_name = "pycommon"tenant_id = "test-tenant-id"result = db_service.get("74bae4fe-13ec-11ef-be60-7404f152b5a1", ActivityQueryItem, tenant_id, collection_name)

三、数据库连接池机制 

第二步中对数据库表操作之前都要先执行mongo_collection_pool.get_collection(self.mongo_settings, collection_name, tenant_id)来获取collection的连接,mongo_collection_pool是一个单例对象,内部有一个缓存机制来存储所有已经操作过的client、database和collection,连接从不关闭,这样可以保证连接不会重复的创建销毁从而提升性能。

看到这里有些细心的朋友可能会提问连接从不关闭,不会造成当前用户过多导致新用户无法连接的情况吗?本节先展示数据库连接池,大家的疑问咱们在下边有验证

from pymongo import MongoClient
from pymongo.database import Database
from pymongo.collection import Collection
from pycommon.models.mongo.mongo_settings import MongoSettingsclass MongoCollectionPool:def __init__(self):self.mongo_client_cache = {}self.mongo_database_cache = {}self.mongo_collection_cache = {}def get_collection(self, mongo_settings: MongoSettings, collection_name: str, tenant_id: str) -> Collection:client_cache_key = mongo_settings.connection_stringdatabase_name = mongo_settings.database_namedatabase_cache_key = client_cache_key + "_" + mongo_settings.database_namecollection_name = collection_name + "_" + tenant_id if tenant_id else collection_namecollection_cache_key = client_cache_key + "_" + mongo_settings.database_name + "_" + collection_namemongo_client: MongoClient = Noneif client_cache_key in self.mongo_client_cache:mongo_client = self.mongo_client_cache[client_cache_key]else:mongo_client = MongoClient(mongo_settings.connection_string)self.mongo_client_cache[client_cache_key] = mongo_clientdatabase: Database = Noneif database_cache_key in self.mongo_database_cache:database = self.mongo_database_cache[database_cache_key]else:database = mongo_client[database_name]self.mongo_database_cache[database_cache_key] = databaseif collection_cache_key in self.mongo_collection_cache:return self.mongo_collection_cache[collection_cache_key]else:collection = database[collection_name]self.mongo_collection_cache[collection_cache_key] = collectionreturn collection# 单例对象
mongo_collection_pool = MongoCollectionPool()
from typing import Optional
from pycommon.models.base_core_model import BaseCoreModelclass MongoSettings(BaseCoreModel):connection_string: Optional[str] = Nonedatabase_name: Optional[str] = None

四、mongodb数据库连接是否需要手动关闭

在判断是否需要手动关闭连接时,先要了解连接是在什么时机开启的(这样我们就能知道做哪些操作会创建新的连接),以及对象释放时是否会自动关闭(这样我们就可以根据不同的项目类型来设计不同的架构)。

为了查看mongodb当前的数据库状态,我们需要用到mongosh工具(我这里用的是mongosh-2.2.6-win32-x64,大家需要的话我可以私发到个人邮箱)。

mongosh安装方式:解压后把bin目录下的两个文件拷贝到C:\Windows\System32。

查看mongodb数据库状态:打开cmd -> 输入mongosh -> 输入db.serverStatus().connections

connections中文说明:serverStatus - MongoDB 手册 v 6 。 0

connections英文说明:serverStatus - MongoDB Manual v7.0

测试项目情况:定时任务项目(执行完毕立即退出)

下面是我测试时了解到的情况(详情请看截图):

  • mongodb默认的可用线程高达100w
  • 创建MongoClient时,active+1、current+2、threaded+2、available-2
  • 创建DataBase时,exhaustHello+1
  • 创建Collection时,数据库状态无变化
  • 执行collectionA.find_one()方法时,current+1、threaded+1、available-1
  • 继续执行collectionA.find()时,数据库状态无变化
  • 继续执行collectionB.find()时,数据库状态无变化
  • 项目结束后数据库状态恢复原状

得出结论:

  • mongodb的可用性很强大,对于web服务类型这种常活程序来说只处理一个数据库且并发不高时,手动关闭更好不关也可以接受
  • 对于定时任务这种用完即销毁的程序来说不需要考虑关闭连接的问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/845145.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python】解决Python报错:AttributeError: ‘int‘ object has no attribute ‘xxx‘

🧑 博主简介:阿里巴巴嵌入式技术专家,深耕嵌入式人工智能领域,具备多年的嵌入式硬件产品研发管理经验。 📒 博客介绍:分享嵌入式开发领域的相关知识、经验、思考和感悟,欢迎关注。提供嵌入式方向…

RLC防孤岛保护装置如何工作的?

什么是RLC防孤岛保护装置? 孤岛保护装置是电力系统中一道强大的守护利器,它以敏锐的感知和迅速的反应,守护着电网的平稳运行。当电网遭遇故障或意外脱离主网时,孤岛保护装置如同一位机警的守门人,立刻做出决断&#xf…

Go微服务: 基于Docker搭建Kong网关环境

概述 在当今的微服务架构中,API网关扮演着至关重要的角色,它作为系统的统一入口负责处理所有内外部请求,实现路由转发、负载均衡、安全控制、限流熔断等多种功能Kong,作为一个开源、高性能、可扩展的API网关,凭借其强…

【机器学习】探索未来科技的前沿:人工智能、机器学习与大模型

文章目录 引言一、人工智能:从概念到现实1.1 人工智能的定义1.2 人工智能的发展历史1.3 人工智能的分类1.4 人工智能的应用 二、机器学习:人工智能的核心技术2.1 机器学习的定义2.2 机器学习的分类2.3 机器学习的实现原理2.4 机器学习的应用2.5 机器学习…

在PostGIS中检查孤线(Find isolated lines in PostGIS)

场景 在PostGIS中有一张线要素表,需要检查该表中的孤线,并且进行自动纠正的计算。 其中孤线定义为两端端点都不在任何其他线的顶点上。 本文介绍在PostGIS中的线要素点,通过函数计算指定线要素表中的孤线,并计算最接近的纠偏位置。 In PostGIS, there is a table of line …

GPT-4o(OpenAI最新推出的大模型)

简介:最近,GPT-4o横空出世。对GPT-4o这一人工智能技术进行评价,包括版本间的对比分析、GPT-4o的技术能力以及个人感受等。 方向一:对比分析 GPT-4o(OpenAI最新推出的大模型)与GPT-4之间的主要区别体现在响应…

268 基于matlab的模拟双滑块连杆机构运动

基于matlab的模拟双滑块连杆机构运动,并绘制运动动画,连杆轨迹可视化输出,并输出杆件质心轨迹、角速度、速度变化曲线。可定义杆长、滑块速度,滑块初始位置等参数。程序已调通,可直接运行。 268 双滑块连杆机构运动 连…

Github单个文件或者单个文件夹下载插件

有时候我们在github上备份了一些资料,比如pdf,ppt,md之类的,需要用到的时候只要某个文件即可,又不要把整个仓库的zip包下载下来,毕竟有时文件太多,下载慢,我们也不需要所有资料,那么就可以使用到…

i-am-a-bot:一款基于多个大语言模型的验证码系统安全评估工具

关于i-am-a-bot i-am-a-bot是一款基于多个大语言模型的验证码安全评估工具,该工具提供了一个使用了多模态大语言模型(LLM)的自动化解决方案,可以帮助广大研究人员测试各种类型验证码机制的安全性。 从底层上看,i-am-a…

renren-fast-vue启动报错

问题描述 拉取人人开源vue项目启动失败 报错信息 版本信息 序号名称版本号1node14.21.3 启动方案 1.拉取项目 git clone https://gitee.com/renrenio/renren-fast-vue.git 2.执行安装依赖命令 npm install 3.此时报错 chromedriver2.27.2 install: node install.js 4.手动…

安装与使用ChatTTS文本转语音模型

非常自然的文本转语音(Text To Speech)TTS,支持中英文混读,还可以穿插笑声,听起来很真实自然。 1、有哪些优点 对话式 TTS: ChatTTS针对对话式任务进行了优化,实现了自然流畅的语音合成,同时支持多说话人。细粒度控制…

老师如何使用小程序发成绩?

作为一名老师,你有没有想过,发成绩其实可以不用那么麻烦?以前,我们可能得一张张地写成绩,或者一封封邮件地发,有时候还得担心信息泄露。但现在,有了小程序,一切都变得简单多了。 老师…

连锁超市能源能效管理方案

1.概述 连锁超市的能源消耗是其运营成本的重要组成部分。有效的能源能效管理不仅可以降低运营成本,也有助于环保,实现可持续发展。本文将探讨几种关键的能源能效管理策略。 2.智能照明系统 采用智能照明系统是提高能效的关键一步。这些系统可以根据店…

【linux】自定义快捷命令/脚本

linux自定义快捷命令 场景自定义命令自定义脚本 场景 深度学习经常要切换到自己环境,conda activate mmagic,但是又不想每次重复打这么多字,想使用快捷命令直接切换。 自定义命令 使用别名(alias)或自定义脚本来创建…

先进的无人机GPS/GNSS模块解决方案

由于多星座支持和增强的信号接收能力,先进的GNSS模块提供了更高的精度和可靠性。集成传感器融合补偿信号中断,实现无缝导航。内置实时运动学(RTK)支持提供厘米级的定位精度。这些模块还优先考虑低功耗和紧凑的尺寸,确保更长的飞行时间和对无人…

【SpringMVC】_SpringMVC实现用户登录

目录 1、需求分析 2、接口定义 2.1 校验接口 请求参数 响应数据 2.2 查询登录用户接口 请求参数 响应数据 4、服务器代码 5、前端代码 5.1 登录页面login.html 5.2 首页页面index.html 6、运行测试 1、需求分析 用户输入账号与密码,后端校验密码是否正确&a…

【论文速读】Self-Rag框架,《Self-Rag: Self-reflective Retrieval augmented Generation》

关于前面的文章阅读《When to Retrieve: Teaching LLMs to Utilize Information Retrieval Effectively》,有网友问与Self-Rag有什么区别。 所以,大概看了一下Self-Rag这篇论文。 两篇文章的方法确实非常像,Self-Rag相对更加复杂一些。 When …

Mac逆向Electron应用

工具库 解压asar文件 第一步 找到应用文件夹位置 打开活动监视器: 搜索相关应用 用命令行打开刚才复制的路径即可 open Applications/XXX.app/Contents/Resources/app第二步 解压打包文件 解压asar文件

C++ 多重继承的内存布局和指针偏移

在 C 程序里,在有多重继承的类里面。指向派生类对象的基类指针,其实是指向了派生类对象里面,该基类对象的起始位置,该位置相对于派生类对象可能有偏移。偏移的大小,等于派生类的继承顺序表里面,排在该类前面…

(自适应手机端)响应式服装服饰外贸企业网站模板

(自适应手机端)响应式服装服饰外贸企业网站模板PbootCMS内核开发的网站模板,该模板适用于服装服饰网站、外贸网站等企业,当然其他行业也可以做,只需要把文字图片换成其他行业的即可;自适应手机端,同一个后台&#xff0…