pymongo功能整理与基础操作类

以下是 Python 与 PyMongo 的完整功能整理,涵盖基础操作、高级功能、性能优化及常见应用场景:


1. 安装与连接
(1) 安装 PyMongo

pip install pymongo

(2) 连接 MongoDB

from pymongo import MongoClient# 基础连接(默认本地,端口27017)
client = MongoClient('mongodb://localhost:27017/')# 带认证的连接
client = MongoClient('mongodb://username:password@host:27017/dbname?authSource=admin'
)# 连接副本集或分片集群
client = MongoClient('mongodb://node1:27017,node2:27017/?replicaSet=rs0')

2. 数据库与集合操作
(1) 选择数据库和集合

db = client['mydatabase']    # 选择数据库(惰性创建)
collection = db['users']     # 选择集合(惰性创建)

(2) 列出数据库和集合

# 列出所有数据库(需权限)
print(client.list_database_names())# 列出数据库中的集合
print(db.list_collection_names())

(3) 删除集合或数据库

db.drop_collection('users')  # 删除集合
client.drop_database('mydatabase')  # 删除数据库

3. 文档操作(CRUD)
(1) 插入文档

# 插入单条文档
user = {'name': 'Alice', 'age': 30, 'tags': ['python', 'dev']}
result = collection.insert_one(user)
print(result.inserted_id)  # 输出插入的 ObjectId# 批量插入
users = [{'name': 'Bob'}, {'name': 'Charlie'}]
result = collection.insert_many(users)
print(result.inserted_ids)  # 输出所有插入的 ObjectId 列表

(2) 查询文档

# 查询单条文档
user = collection.find_one({'name': 'Alice'})
print(user)  # 返回字典或 None# 查询多条文档(带条件、投影和排序)
cursor = collection.find({'age': {'$gt': 25}},           # 条件:年龄大于25{'_id': 0, 'name': 1},          # 投影:仅返回 name 字段
).sort('age', pymongo.ASCENDING)    # 按年龄升序排序for doc in cursor:print(doc)# 统计数量
count = collection.count_documents({'age': {'$gt': 25}})

(3) 更新文档

# 更新单条文档
result = collection.update_one({'name': 'Alice'},{'$set': {'age': 31}, '$addToSet': {'tags': 'database'}}  # 更新操作符
)
print(result.modified_count)  # 输出影响的文档数# 批量更新
result = collection.update_many({'age': {'$lt': 30}},{'$inc': {'age': 1}}  # 年龄加1
)

(4) 删除文档

# 删除单条文档
result = collection.delete_one({'name': 'Alice'})# 批量删除
result = collection.delete_many({'age': {'$gt': 40}})

4. 高级查询与聚合
(1) 查询操作符

# 比较:$gt, $gte, $lt, $lte, $ne
collection.find({'age': {'$gt': 20}})# 逻辑:$and, $or, $not
collection.find({'$or': [{'age': 30}, {'name': 'Bob'}]})# 数组:$in, $nin, $all, $elemMatch
collection.find({'tags': {'$in': ['python', 'java']}})

(2) 聚合管道

pipeline = [{'$match': {'age': {'$gt': 25}}},          # 筛选条件{'$group': {'_id': '$city', 'count': {'$sum': 1}}},  # 按城市分组计数{'$sort': {'count': -1}},                  # 按计数降序排序{'$limit': 5}                              # 取前5条
]result = collection.aggregate(pipeline)
for doc in result:print(doc)

(3) 索引管理

# 创建索引
collection.create_index([('name', pymongo.ASCENDING)], unique=True)# 查看索引
print(collection.index_information())# 删除索引
collection.drop_index('name_1')

5. 事务与原子性操作
(1) 多文档事务(MongoDB 4.0+)

with client.start_session() as session:session.start_transaction()try:collection.update_one({'name': 'Alice'},{'$inc': {'balance': -100}},session=session)collection.update_one({'name': 'Bob'},{'$inc': {'balance': 100}},session=session)session.commit_transaction()except Exception as e:session.abort_transaction()print("事务回滚:", e)

(2) 原子操作符

# 原子更新
collection.find_one_and_update({'name': 'Alice'},{'$inc': {'counter': 1}},return_document=pymongo.ReturnDocument.AFTER  # 返回更新后的文档
)

6. 性能优化与最佳实践
(1) 查询优化
• 使用投影减少返回字段:

collection.find({}, {'_id': 0, 'name': 1})

• 覆盖查询(Covered Query):确保查询字段和投影字段在索引中。

(2) 批量操作

# 批量插入(减少网络开销)
bulk_ops = [pymongo.InsertOne({'name': f'User_{i}'}) for i in range(1000)]
collection.bulk_write(bulk_ops)

(3) 连接池管理

client = MongoClient('mongodb://localhost:27017/',maxPoolSize=100,      # 最大连接数minPoolSize=10,       # 最小空闲连接socketTimeoutMS=3000  # 超时时间
)

7. 数据建模与模式设计
(1) 内嵌文档与引用
• 内嵌文档:适合频繁访问的子数据。

user = {'name': 'Alice','address': {'city': 'New York', 'zip': '10001'}  # 内嵌文档
}

• 引用关系:适合独立实体。

# 用户引用订单
order = {'user_id': user['_id'], 'product': 'Laptop'}

(2) 分片策略
• 选择分片键:高频查询字段(如 user_id)。

• 分片命令(需在 mongos 执行):

sh.shardCollection("mydb.orders", {"user_id": 1})

8. 安全与运维
(1) 认证与权限

# 创建用户
db.command('createUser', 'admin',pwd='secret',roles=[{'role': 'readWrite', 'db': 'mydb'}]
)

(2) 备份与恢复
mongodump 备份:

mongodump --uri="mongodb://user:pass@host:27017/mydb" --out=/backup

mongorestore 恢复:

mongorestore --uri="mongodb://host:27017" /backup/mydb

(3) 监控与日志
• 查看数据库状态:

server_status = db.command('serverStatus')
print(server_status['connections']['available'])

• 启用慢查询日志:

mongod --setParameter slowMS=100 --profileLevel 2

9. 常见应用场景
(1) 日志存储与分析

# 插入日志
log_entry = {'timestamp': datetime.now(),'level': 'INFO','message': 'User login success'
}
collection.insert_one(log_entry)# 分析错误日志数量
pipeline = [{'$match': {'level': 'ERROR'}},{'$group': {'_id': '$service', 'count': {'$sum': 1}}}
]

(2) 实时排行榜

# 更新分数
collection.update_one({'user_id': 1001},{'$inc': {'score': 10}},upsert=True
)# 获取前10名
top_players = collection.find().sort('score', -1).limit(10)

10. 扩展工具与库
(1) 使用 Motor 实现异步操作

from motor.motor_asyncio import AsyncIOMotorClientasync def query_data():client = AsyncIOMotorClient('mongodb://localhost:27017')collection = client.mydb.userscursor = collection.find({'age': {'$gt': 20}})async for doc in cursor:print(doc)

(2) 使用 MongoEngine(ORM)

from mongoengine import Document, StringField, IntFieldclass User(Document):name = StringField(required=True)age = IntField()# 查询数据
users = User.objects(age__gt=25)

总结

功能PyMongo 方法/操作典型场景
基础 CRUDinsert_one, find, update_many数据增删改查
聚合分析aggregate + 管道操作复杂统计、日志分析
事务管理start_session + 事务块转账、订单处理
性能优化索引、批量操作、连接池高并发读写、大数据处理
数据建模内嵌文档、引用关系、分片电商、社交网络、IoT 数据存储

通过合理使用 PyMongo,可以高效操作 MongoDB 应对多样化的数据存储需求,结合 Redis 实现缓存加速,构建高性能应用。

以下是一个基于 pymongo 封装的 MongoDB 基础操作类,支持连接管理、CRUD、索引操作、聚合查询、分页等常用功能:

from typing import Any, Dict, List, Optional, Union
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.errors import PyMongoError
from pymongo.results import InsertOneResult, InsertManyResult, UpdateResult, DeleteResult
from pymongo.collection import Collection
from bson import ObjectIdclass MongoDBClient:"""MongoDB 基础操作类"""def __init__(self, uri: str = "mongodb://localhost:27017/",db_name: str = "mydatabase", collection_name: str = "default_collection"):"""初始化 MongoDB 客户端:param uri: MongoDB 连接 URI:param db_name: 数据库名称:param collection_name: 集合名称"""self.client = MongoClient(uri)self.db = self.client[db_name]self.collection: Collection = self.db[collection_name]# ------------------------- 基础 CRUD 操作 -------------------------def insert_one(self, document: Dict) -> Optional[ObjectId]:"""插入单条文档"""try:result: InsertOneResult = self.collection.insert_one(document)return result.inserted_idexcept PyMongoError as e:print(f"Insert one error: {e}")return Nonedef insert_many(self, documents: List[Dict]) -> Optional[List[ObjectId]]:"""批量插入文档"""try:result: InsertManyResult = self.collection.insert_many(documents)return result.inserted_idsexcept PyMongoError as e:print(f"Insert many error: {e}")return Nonedef find_one(self, query: Dict, projection: Optional[Dict] = None) -> Optional[Dict]:"""查询单条文档"""try:return self.collection.find_one(query, projection)except PyMongoError as e:print(f"Find one error: {e}")return Nonedef find(self, query: Dict, projection: Optional[Dict] = None,sort: Optional[List[tuple]] = None,limit: int = 0) -> List[Dict]:"""查询多条文档"""try:cursor = self.collection.find(query, projection)if sort:cursor = cursor.sort(sort)if limit > 0:cursor = cursor.limit(limit)return list(cursor)except PyMongoError as e:print(f"Find error: {e}")return []def update_one(self, query: Dict, update_data: Dict, upsert: bool = False) -> Optional[UpdateResult]:"""更新单条文档"""try:result: UpdateResult = self.collection.update_one(query, {'$set': update_data}, upsert=upsert)return resultexcept PyMongoError as e:print(f"Update one error: {e}")return Nonedef delete_one(self, query: Dict) -> Optional[DeleteResult]:"""删除单条文档"""try:result: DeleteResult = self.collection.delete_one(query)return resultexcept PyMongoError as e:print(f"Delete one error: {e}")return None# ------------------------- 高级操作 -------------------------def create_index(self, keys: List[tuple], unique: bool = False, background: bool = True) -> Optional[str]:"""创建索引"""try:index_name = self.collection.create_index(keys, unique=unique, background=background)return index_nameexcept PyMongoError as e:print(f"Create index error: {e}")return Nonedef count_documents(self, query: Dict) -> int:"""统计文档数量"""try:return self.collection.count_documents(query)except PyMongoError as e:print(f"Count documents error: {e}")return 0def aggregate(self, pipeline: List[Dict]) -> List[Dict]:"""聚合查询"""try:return list(self.collection.aggregate(pipeline))except PyMongoError as e:print(f"Aggregate error: {e}")return []def paginate(self, query: Dict, page: int = 1, per_page: int = 10,sort: Optional[List[tuple]] = None,projection: Optional[Dict] = None) -> Dict:"""分页查询"""try:total = self.count_documents(query)skip = (page - 1) * per_pagecursor = self.collection.find(query, projection)if sort:cursor = cursor.sort(sort)documents = cursor.skip(skip).limit(per_page)return {"total": total,"page": page,"per_page": per_page,"data": list(documents)}except PyMongoError as e:print(f"Paginate error: {e}")return {"total": 0, "page": page, "per_page": per_page, "data": []}# ------------------------- 事务支持 -------------------------def execute_transaction(self, operations: callable) -> bool:"""执行事务操作"""session = self.client.start_session()try:with session.start_transaction():operations(self.collection, session)return Trueexcept PyMongoError as e:session.abort_transaction()print(f"Transaction aborted: {e}")return False# ------------------------- 工具方法 -------------------------@staticmethoddef to_objectid(_id: Union[str, ObjectId]) -> ObjectId:"""将字符串转换为 ObjectId"""return _id if isinstance(_id, ObjectId) else ObjectId(_id)def close(self):"""关闭连接"""self.client.close()# ------------------------- 使用示例 -------------------------
if __name__ == "__main__":# 初始化客户端mongo_client = MongoDBClient(uri="mongodb://user:pass@localhost:27017/",db_name="test_db",collection_name="users")# 插入数据user_id = mongo_client.insert_one({"name": "Alice","age": 30,"email": "alice@example.com"})print(f"Inserted ID: {user_id}")# 查询数据user = mongo_client.find_one({"name": "Alice"})print(f"Found user: {user}")# 分页查询pagination = mongo_client.paginate(query={"age": {"$gt": 20}},page=1,per_page=10,sort=[("age", ASCENDING)])print(f"Page 1 data: {pagination['data']}")# 关闭连接mongo_client.close()

核心功能说明

功能方法说明
连接管理__init__, close支持自定义 URI、数据库和集合名称
CRUD 操作insert_one, find提供单条/批量操作,支持投影和排序
索引管理create_index可创建唯一索引、后台索引
聚合查询aggregate支持完整的聚合管道操作
分页查询paginate返回分页数据和总记录数
事务支持execute_transaction封装多文档事务操作
类型转换to_objectid字符串与 ObjectId 互转

使用场景

  1. 快速开发:直接继承或实例化类,无需重复编写 CRUD 代码。
  2. Web 后端:集成到 FastAPI/Django 服务中,处理用户数据。
  3. 数据分析:通过聚合方法实现复杂统计。
  4. 定时任务:封装数据清洗、日志处理等操作。

优化建议

  1. 连接池配置:在初始化时添加 maxPoolSizeminPoolSize 参数。
  2. 日志记录:将 print 替换为 logging 模块记录错误信息。
  3. 异步支持:使用 motor 库实现异步版本(适合 FastAPI 等异步框架)。
  4. 数据校验:集成 pydantic 对输入数据进行模式验证。
  5. 缓存集成:结合 Redis 实现高频查询缓存。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/77338.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Trae+DeepSeek学习Python开发MVC框架程序笔记(四):使用sqlite存储查询并验证用户名和密码

继续通过Trae向DeepSeek发问并修改程序,实现程序运行时生成数据库,用户在系统登录页面输入用户名和密码后,控制器通过模型查询用户数据库表来验证用户名和密码,验证通过后显示登录成功页面,验证失败则显示登录失败页面…

如何识别金融欺诈行为并进行分析预警

金融行业以其高效便捷的服务深刻改变了人们的生活方式。然而,伴随技术进步而来的,是金融欺诈行为的日益猖獗。从信用卡盗刷到复杂的庞氏骗局,再到网络钓鱼和洗钱活动,金融欺诈的形式层出不穷,其规模和影响也在不断扩大。根据全球反欺诈组织(ACFE)的最新报告,仅2022年,…

纷析云:开源财务管理软件的创新与价值

在企业数字化转型中,纷析云作为一款优秀的开源财务管理软件,正为企业财务管理带来新变革,以下是其核心要点。 一、产品概述与技术架构 纷析云采用微服务架构,功能组件高内聚低耦合,可灵活扩展和定制。前端基于现代框…

蛋白质大语言模型ESM介绍

ESM(Evolutionary Scale Modeling)是 Meta AI Research 团队开发的一系列用于蛋白质的预训练语言模型。这些模型在蛋白质结构预测、功能预测和蛋白质设计等领域展现出了强大的能力。以下是对 ESM 的详细介绍: 核心特点 大规模预训练:基于大规模蛋白质序列数据进行无监督学…

OpenCv高阶(七)——图像拼接

目录 一、图像拼接的原理过程 1. 特征检测与描述(Feature Detection & Description) 2. 特征匹配(Feature Matching) 3. 图像配准(Image Registration) 4. 图像变换与投影(Warping&…

Native层Trace监控性能

一、基础实现方法 1.1 头文件引用 #include <utils/Trace.h> // 基础版本 #include <cutils/trace.h> // 兼容旧版本1.2 核心宏定义 // 区间追踪&#xff08;推荐&#xff09; ATRACE_BEGIN("TraceTag"); ...被监控代码... ATRACE_END();// 函数级自…

金融行业微服务架构设计与挑战 - Java架构师面试实战

金融行业微服务架构设计与挑战 - Java架构师面试实战 本文通过模拟一位拥有十年Java研发经验的资深架构师马架构与面试官之间的对话&#xff0c;深入探讨了金融行业项目在微服务架构下的技术挑战与解决方案。 第一轮提问 面试官&#xff1a; 马架构&#xff0c;请介绍一下您…

服务器虚拟化:技术解析与实践指南

在信息技术飞速发展的今天,企业对服务器资源的需求日益增长,传统物理服务器存在资源利用率低、部署周期长、管理成本高等问题。服务器虚拟化技术应运而生,它通过将物理服务器的计算、存储、网络等资源进行抽象和整合,划分成多个相互隔离的虚拟服务器,从而提高资源利用率、…

OpenCV 图形API(54)颜色空间转换-----将图像从 RGB 色彩空间转换到 HSV色彩空间RGB2HSV()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 将图像从 RGB 色彩空间转换为 HSV。该函数将输入图像从 RGB 色彩空间转换到 HSV。R、G 和 B 通道值的常规范围是 0 到 255。 输出图像必须是 8 位…

Spring Boot的优点:赋能现代Java开发的利器

Spring Boot 是基于 Spring 框架的快速开发框架&#xff0c;自 2014 年发布以来&#xff0c;凭借其简洁性、灵活性和强大的生态系统&#xff0c;成为 Java 后端开发的首选工具。尤其在 2025 年&#xff0c;随着微服务、云原生和 DevOps 的普及&#xff0c;Spring Boot 的优势更…

基于强化学习的智能交通控制系统设计

标题:基于强化学习的智能交通控制系统设计 内容:1.摘要 随着城市交通流量的不断增长&#xff0c;传统交通控制方法在应对复杂多变的交通状况时逐渐显现出局限性。本文旨在设计一种基于强化学习的智能交通控制系统&#xff0c;以提高交通运行效率、减少拥堵。通过构建强化学习模…

数据挖掘技术与应用课程论文——数据挖掘中的聚类分析方法及其应用研究

数据挖掘中的聚类分析方法及其应用研究 摘要 聚类分析是数据挖掘技术中的一个重要组成部分,它通过将数据集中的对象划分为多个组或簇,使得同一簇内的对象具有较高的相似性,而不同簇之间的对象具有较低的相似性。 本文系统地研究了数据挖掘中的多种聚类分析方法及其应用。首先…

Java基础语法10分钟速成

Java基础语法10分钟速成&#xff0c;记笔记版 JDKhello world变量字符串 类&#xff0c;继承&#xff0c;多态&#xff0c;重载 JDK JDK即Java development key&#xff0c;Java环境依赖包 在jdk中 编译器javac将代码的Java源文件编译为字节码文件&#xff08;.class&#xff…

在WSL2+Ubuntu22.04中通过conda pack导出一个conda环境包,然后尝试导入该环境包

如何导出一个离线conda环境&#xff1f;有两种方式&#xff0c;一种是导出env.yml即环境配置&#xff0c;一种是通过conda pack导出为一个环境包&#xff0c;前者只是导出配置&#xff08;包括包名、版本等&#xff09;&#xff0c;而后者是直接将环境中所有的内容打包&#xf…

盈达科技:登顶GEO优化全球制高点,以AICC定义AI时代内容智能优化新标杆

一、技术制高点——全球独创AICC系统架构&#xff0c;构建AI内容优化新范式 作为全球首个实现AI内容全链路优化的技术供应商&#xff0c;盈达科技凭借AICC智能协同中心&#xff08;自适应内容改造、智能数据投喂、认知权重博弈、风险动态响应四大引擎&#xff09;&#…

设计看似完美却测不过? Intra-Pair Skew 是「讯号完整性(Signal Integrity)」里最隐形的杀手

各位不知道有没有遇过&#xff0c;一对很长的差分走线&#xff0c;看起来很正常&#xff0c;但是测试结果偶尔会fail偶尔会pass&#xff0c;不像是软件问题&#xff0c;也不像是制程问题。 看了一下Layout&#xff0c;发现阻抗匹配控制的非常好&#xff0c;TDR测试也显示阻抗好…

介绍常用的退烧与消炎药

每年春夏交替之季&#xff0c;是感冒发烧、咳嗽、咽喉肿痛、支气管炎、扁桃体炎的高发期。在家里或公司&#xff0c;常备几种预防感冒发烧、咳嗽、流鼻涕、咽喉发炎的药品&#xff0c;是非常必要的。下面介绍几款效果非常明显的中成药、西药&#xff0c;具体如下。 1 莲芝消炎…

Redis为什么不直接使用C语言中的字符串?

因为C语言字符串存在问题&#xff1a; 获取字符串长度需要进行运算(获取字符串长度需要遍历整个字符串&#xff0c;直到遇到终止符 \0&#xff0c;时间复杂度为 O(n))非二进制安全&#xff08;结束标识符\0可能在一些二进制格式的数据处理时字符串时产生错误&#xff09;不可修…

直线模组精度测试的标准是什么?

直线模组的精度测试是确保其性能和稳定性的重要环节。那么&#xff0c;大家知道直线模组精度测试的标准是什么吗&#xff1f; 1、定位精度&#xff1a;以最大行程为基准长度&#xff0c;用从基准位置开始实际移动的距离与指令值之间的最大误差的绝对值来表示。一般来说&#xf…

开源AI视频FramePack发布:6GB显卡本地运行

您现在可以在自己的笔记本电脑上免费生成完整的离线AI视频。 只有GPU和纯粹的创造力。 这到底是什么? 一个名为FramePack的新型离线AI视频生成器几天前在GitHub上发布 — 几乎没人在谈论它。这很奇怪,因为这个工具真的很厉害。 它允许您从静态图像和提示词在自己的机器上…