异步框架使用loguru和contextvars实现日志按Id输出

需求阐述

工作中使用fastStream模块消费kafka事件的时候,遇到以下问题。1. 由于main.py模块要用到许多其他模块 ,main模块,和其他模块都有日志输出。2. 要实现异步消费事件,但是又不希望日志打印的很乱。3. 想着在每次消费事件的时候,生成一个唯一的id,然后根据id生成一个log处理器。4. 这个独一无二的log处理器可以透传到其他模块。但是我又不想通过参数的方式传递,那样太麻烦了。5. 尝试借助上下文变量的方式。

询问deepseek

我的描述如下

我有一个需求帮我设计以日志框架 1.我使用fastStream模块,消费kafka事件。首先框架里至少有多个自定义模块,log.py, main.py, git.py, change.py等。 2. 这四个模块,分别对应日志处理,主进程,git命令,change信息的处理。每个模块都需要打印日志信息。 3. 我希望当每个事件进入到主进程后,能生成一id,这个id将生成一log处理器。这个log处理在主进程调度相应的git,change.py模块的时候,能通过全局的方式对应使用这个事件对应id生成的log对象。有肯能是需要使用到上下文的方式来传递,但我不希望每次调度一个git或者change的函数或者方法的时候都将这个log处理器作为参数。我希望在主进程设置,在子进程直接使用。 4. 这个框架能够适用于异步框架。5. log使用loguru模块,并且需要实现每个id的日志写到id.log文件中,并且20天自动清理一次。

# log.py
from loguru import logger
from contextvars import ContextVar
import os
from pathlib import Path
from datetime import timedeltaLOG_DIR = Path("logs")
RETENTION_DAYS = 20
current_event_id = ContextVar('event_id', default=None)def configure_logger():"""全局日志配置"""logger.remove()# 添加控制台输出logger.add(sink=lambda msg: print(msg, end=""),format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level.icon} | {message}",level="INFO",colorize=True,enqueue=True  # 确保异步安全)def add_event_logger(event_id: str):"""为事件添加文件日志"""log_file = LOG_DIR / f"{event_id}.log"sink_id = logger.add(sink=log_file,format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",retention=timedelta(days=RETENTION_DAYS),filter=lambda record: record["extra"].get("event_id") == event_id,enqueue=True)return sink_idclass EventLoggerContext:"""事件日志上下文管理器"""def __init__(self, event_id: str):self.event_id = event_idself.sink_id = Nonedef __enter__(self):self.sink_id = add_event_logger(self.event_id)token = current_event_id.set(self.event_id)logger.configure(extra={"event_id": self.event_id})return selfdef __exit__(self, exc_type, exc_val, exc_tb):logger.remove(self.sink_id)current_event_id.set(None)logger.configure(extra={"event_id": None})# git.py
from loguru import logger
from .log import current_event_idasync def git_operation(repo_url: str):event_id = current_event_id.get()logger.bind(event_id=event_id).info(f"Cloning repository: {repo_url}")# ... git操作逻辑logger.bind(event_id=event_id).success("Git operation completed")# change.py
from loguru import logger
from .log import current_event_idasync def process_changes(changes: list):event_id = current_event_id.get()logger.bind(event_id=event_id).debug(f"Processing {len(changes)} changes")# ... change处理逻辑logger.bind(event_id=event_id).warning("Found potential conflicts")# main.py
from faststream import Context
from loguru import logger
from .log import EventLoggerContext, configure_logger, LOG_DIR
from .git import git_operation
from .change import process_changes
import uuidasync def handle_event(event):event_id = str(uuid.uuid4())  # 生成唯一事件IDwith EventLoggerContext(event_id):logger.info(f"Start processing event {event_id}")# 主处理流程await git_operation(event["repo_url"])await process_changes(event["changes"])logger.success(f"Event {event_id} processed")def setup():"""初始化配置"""LOG_DIR.mkdir(exist_ok=True)configure_logger()# 启动FastStream消费者
from faststream.kafka import KafkaBrokerbroker = KafkaBroker("localhost:9092")
@broker.subscriber("events-topic")
async def on_event(event):await handle_event(event)

调试代码

  • 项目结构
    在这里插入图片描述

  • main.py中的代码

# main.py
import asynciofrom faststream import Context
from loguru import logger
from log import EventLoggerContext, configure_logger, LOG_DIR
from git import git_operation
from change import process_changes
import uuidasync def handle_event(event):event_id = str(uuid.uuid4())  # 生成唯一事件IDwith EventLoggerContext(event_id):logger.info(f"Start processing event {event_id}")# 主处理流程await git_operation(event["repo_url"])await process_changes(event["changes"])logger.success(f"Event {event_id} processed")def setup():"""初始化配置"""LOG_DIR.mkdir(exist_ok=True)configure_logger()# 启动FastStream消费者
# from faststream.kafka import KafkaBroker
#
# broker = KafkaBroker("localhost:9092")
#
#
# @broker.subscriber("events-topic")
# async def on_event(event):
#     await handle_event(event)async def stress_test():events = [{"repo_url": f"https://github.com/user/repo_{i}.git","changes": "#"*i} for i in range(100)]await asyncio.gather(*[handle_event(e) for e in events])# 测试运行(在入口文件添加)
if __name__ == "__main__":setup()asyncio.run(stress_test())
  • log.py中的代码
# log.py
from loguru import logger
from contextvars import ContextVar
import os
from pathlib import Path
from datetime import timedeltaLOG_DIR = Path("logs")
RETENTION_DAYS = 20
current_event_id = ContextVar('event_id', default=None)def configure_logger():"""全局日志配置"""logger.remove()# 添加控制台输出logger.add(sink=lambda msg: print(msg, end=""),format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level.icon} | {message}",level="INFO",colorize=True,enqueue=True  # 确保异步安全)def add_event_logger(event_id: str):"""为事件添加文件日志"""log_file = LOG_DIR / f"{event_id}.log"sink_id = logger.add(sink=log_file,format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",retention=timedelta(days=RETENTION_DAYS),filter=lambda record: record["extra"].get("event_id") == event_id,enqueue=True)return sink_idclass EventLoggerContext:"""事件日志上下文管理器"""def __init__(self, event_id: str):self.event_id = event_idself.sink_id = Nonedef __enter__(self):self.sink_id = add_event_logger(self.event_id)token = current_event_id.set(self.event_id)logger.configure(extra={"event_id": self.event_id})return selfdef __exit__(self, exc_type, exc_val, exc_tb):logger.remove(self.sink_id)current_event_id.set(None)logger.configure(extra={"event_id": None})
  • git.py中的代码
# git.py
from loguru import logger
from log import current_event_idasync def git_operation(repo_url: str):event_id = current_event_id.get()logger.bind(event_id=event_id).info(f"Cloning repository: {repo_url}")# ... git操作逻辑logger.bind(event_id=event_id).success("Git operation completed")

chang.py中的代码

# change.py
from loguru import logger
from log import current_event_idasync def process_changes(changes: list):event_id = current_event_id.get()logger.bind(event_id=event_id).debug(f"Processing {len(changes)} changes")# ... change处理逻辑logger.bind(event_id=event_id).warning("Found potential conflicts")

验证结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/75654.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【HTTPS协议】

文章目录 一、HTTPS二、HTTPS协议五种加密方案1.只使用对称加密2.只使用非对称加密3.双方都使用非对称加密4.对称加密非对称加密中间人攻击理解数字签名CA机构和证书 5. 对称加密非对称加密证书认证中间人篡改证书&#xff1f;中间人调包整个证书&#xff1f; 常见问题总结 一、…

算法设计学习8

实验目的及要求&#xff1a; 通过深入学习树&#xff08;Tree&#xff09;和二叉树&#xff08;Binary Tree&#xff09;这两种重要的数据结构&#xff0c;掌握它们的基本概念、性质和操作&#xff0c;提高对树形结构的理解和应用能力。通过本实验&#xff0c;学生将深化对树和…

P17_ResNeXt-50

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 一、模型结构 ResNeXt-50由多个残差块&#xff08;Residual Block&#xff09;组成&#xff0c;每个残差块包含三个卷积层。以下是模型的主要结构&#xff1…

【YOLO系列(V5-V12)通用数据集-剪刀石头布手势检测数据集】

YOLO格式的剪刀石头布手势检测数据集&#xff0c;适用于YOLOv5-v11所有版本&#xff0c;可以用于本科毕设、发paper、做课设等等&#xff0c;有需要的在这里获取&#xff1a; 【YOLO系列&#xff08;V5-V12&#xff09;通用数据集-剪刀石头布手势检测数据集】 数据集专栏地址&a…

基于连接池与重试机制的高效TDengine写入方案

摘要 在时序数据库应用场景中,如何构建稳定高效的写入机制是核心挑战。本文基于提供的Python代码实现,解析一种结合连接池管理、智能重试策略和事务控制的TDengine写入方案,并分析其技术优势与优化方向。 一、代码 from dbutils.pooled_db import PooledDB import timede…

抖音热点视频识别与分片处理机制解析

抖音作为日活数亿的短视频平台,其热点视频识别和分片处理机制是支撑高并发访问的核心技术。以下是抖音热点视频识别与分片的实现方案: 热点视频识别机制 1. 实时行为监控系统 用户行为聚合:监控点赞、评论、分享、完播率等指标的异常增长曲线内容特征分析:通过AI识别视频…

基于RDK X3的“校史通“机器人:SLAM导航+智能交互,让校史馆活起来!

视频标题&#xff1a; 【校史馆の新晋顶流】RDK X3机器人&#xff1a;导览员看了直呼内卷 视频文案&#xff1a; 跑得贼稳团队用RDK X3整了个大活——给校史馆造了个"社牛"机器人&#xff01; 基于RDK X3开发板实现智能导航与语音交互SLAM技术让机器人自主避障不…

Metal学习笔记十三:阴影

在本章中&#xff0c;您将了解阴影。阴影表示表面上没有光。当另一个表面或对象使对象与光线相遮挡时&#xff0c;您会看到对象上的阴影。在项目中添加阴影可使您的场景看起来更逼真&#xff0c;并提供深度感。 阴影贴图 阴影贴图是包含场景阴影信息的纹理。当光线照射到物体…

Matplotlib:数据可视化的艺术与科学

引言&#xff1a;让数据开口说话 在数据分析与机器学习领域&#xff0c;可视化是理解数据的重要桥梁。Matplotlib 作为 Python 最流行的绘图库&#xff0c;提供了从简单折线图到复杂 3D 图表的完整解决方案。本文将通过实际案例&#xff0c;带您从基础绘图到高级定制全面掌握 …

Python数据可视化-第4章-图表样式的美化

环境 开发工具 VSCode库的版本 numpy1.26.4 matplotlib3.10.1 ipympl0.9.7教材 本书为《Python数据可视化》一书的配套内容&#xff0c;本章为第4章 图表样式的美化 本章主要介绍了图表样式的美化&#xff0c;包括图表样式概述、使用颜色、选择线型、添加数据标记、设置字体…

嵌入式海思Hi3861连接华为物联网平台操作方法

1.1 实验目的 快速演示 1、认识轻量级HarmonyOS——LiteOS-M 2、初步掌握华为云物联网平台的使用 3、快速驱动海思Hi3861 WIFI芯片,连接互联网并登录物联网平台

如何在Redis容量限制下保持热点数据

如何在Redis容量限制下保持热点数据 当数据库有100万条数据但Redis只能保存10万条时,需要智能的策略来确保Redis中存储的都是最常访问的热点数据。以下是几种有效的解决方案: 一、内存淘汰策略 Redis提供了多种内存淘汰机制,当内存不足时会自动删除部分数据: 策略命令/配…

cv2.fillPoly()和cv2.polylines()

参数解释 cv2.fillPoly() 和 cv2.polylines() 都是 OpenCV 的函数。功能是绘制多边形&#xff0c;cv2.fillPoly()可绘制实心多边形&#xff0c; cv2.polylines() 可绘制空心多边形 cv2.fillPoly()用途&#xff1a;提取ROI 可在黑色图像上&#xff0c;填充白色&#xff0c;作为…

数据库--SQL

SQL&#xff1a;Structured Query Language&#xff0c;结构化查询语言 SQL是用于管理关系型数据库并对其中的数据进行一系列操作&#xff08;包括数据插入、查询、修改删除&#xff09;的一种语言 分类&#xff1a;数据定义语言DDL、数据操纵语言DML、数据控制语言DCL、事务处…

【python】速通笔记

Python学习路径 - 从零基础到入门 环境搭建 安装Python Windows: 从官网下载安装包 https://www.python.org/downloads/Mac/Linux: 通常已预装&#xff0c;可通过终端输入python3 --version检查 配置开发环境 推荐使用VS Code或PyCharm作为代码编辑器安装Python扩展插件创建第…

批量删除git本地分支和远程分支命令

1、按照关键词开头匹配删除远程分支 git branch -r | grep "origin/feature/develop-1"| sed s/origin\///g | xargs -n 1 git push origin --delete git branch -r 列出所有远端分支。 grep "origin/feature/develop-1" 模糊匹配分支名称包含"orig…

上市电子制造企业如何实现合规的质量文件管理?

浙江洁美电子科技股份有限公司成立于2001年&#xff0c;是一家专业为片式电子元器件(被动元件、分立器件、集成电路及LED)配套生产电子薄型载带、上下胶带、离型膜、流延膜等产品的国家高新技术企业&#xff0c;主要产品有分切纸带、打孔经带、压孔纸带、上下胶带、塑料载带及其…

leetcode数组-有序数组的平方

题目 题目链接&#xff1a;https://leetcode.cn/problems/squares-of-a-sorted-array/ 给你一个按 非递减顺序 排序的整数数组 nums&#xff0c;返回 每个数字的平方 组成的新数组&#xff0c;要求也按 非递减顺序 排序。 输入&#xff1a;nums [-4,-1,0,3,10] 输出&#xff…

基于微信小程序的医院挂号预约系统设计与实现

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本微信小程序医院挂号预约系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大…

密码学基础——DES算法

前面的密码学基础——密码学文章中介绍了密码学相关的概念&#xff0c;其中简要地对称密码体制(也叫单钥密码体制、秘密密钥体制&#xff09;进行了解释&#xff0c;我们可以知道单钥体制的加密密钥和解密密钥相同&#xff0c;单钥密码分为流密码和分组密码。 流密码&#xff0…