Python10天突击--Day 2: 实现观察者模式

以下是 Python 实现观察者模式的完整方案,包含同步/异步支持、类型注解、线程安全等特性:


1. 经典观察者模式实现

from abc import ABC, abstractmethod
from typing import List, Anyclass Observer(ABC):"""观察者抽象基类"""@abstractmethoddef update(self, subject: Any) -> None:passclass Subject:"""被观察对象基类"""def __init__(self):self._observers: List[Observer] = []def attach(self, observer: Observer) -> None:if observer not in self._observers:self._observers.append(observer)def detach(self, observer: Observer) -> None:try:self._observers.remove(observer)except ValueError:passdef notify(self) -> None:"""同步通知所有观察者"""for observer in self._observers:observer.update(self)# 使用示例
class TemperatureSensor(Subject):"""具体被观察者:温度传感器"""def __init__(self):super().__init__()self._temperature = 0.0@propertydef temperature(self) -> float:return self._temperature@temperature.setterdef temperature(self, value: float) -> None:self._temperature = valueself.notify()  # 温度变化时通知观察者class Display(Observer):"""具体观察者:显示屏"""def update(self, subject: TemperatureSensor) -> None:print(f"当前温度: {subject.temperature}°C")# 客户端代码
sensor = TemperatureSensor()
display = Display()
sensor.attach(display)sensor.temperature = 25.5  # 输出: 当前温度: 25.5°C

2. 线程安全增强版

import threading
from typing import List, Anyclass ThreadSafeSubject:"""线程安全的被观察对象"""def __init__(self):self._observers: List[Observer] = []self._lock = threading.RLock()def attach(self, observer: Observer) -> None:with self._lock:if observer not in self._observers:self._observers.append(observer)def detach(self, observer: Observer) -> None:with self._lock:try:self._observers.remove(observer)except ValueError:passdef notify(self) -> None:"""线程安全的通知"""with self._lock:observers = self._observers.copy()for observer in observers:observer.update(self)

3. 异步观察者模式

import asyncio
from abc import ABC, abstractmethod
from typing import List, Anyclass AsyncObserver(ABC):"""异步观察者接口"""@abstractmethodasync def update(self, subject: Any) -> None:passclass AsyncSubject:"""支持异步通知的被观察对象"""def __init__(self):self._observers: List[AsyncObserver] = []def attach(self, observer: AsyncObserver) -> None:if observer not in self._observers:self._observers.append(observer)async def notify(self) -> None:"""异步通知所有观察者"""await asyncio.gather(*[observer.update(self) for observer in self._observers])# 使用示例
class AsyncTemperatureSensor(AsyncSubject):def __init__(self):super().__init__()self._temp = 0.0async def set_temperature(self, value: float) -> None:self._temp = valueawait self.notify()class CloudLogger(AsyncObserver):async def update(self, subject: AsyncTemperatureSensor) -> None:print(f"云端记录温度: {subject._temp}°C")await asyncio.sleep(0.1)  # 模拟网络请求async def main():sensor = AsyncTemperatureSensor()sensor.attach(CloudLogger())await sensor.set_temperature(28.5)  # 输出: 云端记录温度: 28.5°Casyncio.run(main())

4. 事件总线实现(发布-订阅模式)

from typing import Dict, List, Callable, Any
import inspectclass EventBus:"""事件总线(高级观察者模式)"""_instance = Nonedef __new__(cls):if not cls._instance:cls._instance = super().__new__(cls)cls._instance._subscriptions: Dict[str, List[Callable]] = {}return cls._instancedef subscribe(self, event_type: str, callback: Callable) -> None:if not inspect.iscoroutinefunction(callback):callback = self._sync_to_async(callback)if event_type not in self._subscriptions:self._subscriptions[event_type] = []self._subscriptions[event_type].append(callback)async def publish(self, event_type: str, **data) -> None:if event_type in self._subscriptions:await asyncio.gather(*[callback(**data) for callback in self._subscriptions[event_type]])@staticmethoddef _sync_to_async(func: Callable) -> Callable:async def wrapper(*args, **kwargs):return func(*args, **kwargs)return wrapper# 使用示例
bus = EventBus()@bus.subscribe("temperature_change")
async def log_temp_change(value: float):print(f"温度变化记录: {value}°C")async def trigger_events():await bus.publish("temperature_change", value=30.0)asyncio.run(trigger_events())  # 输出: 温度变化记录: 30.0°C

5. 带过滤器的观察者模式

from typing import Callable, Anyclass FilteredObserver:"""带条件过滤的观察者"""def __init__(self, callback: Callable, filter_condition: Callable[[Any], bool]):self.callback = callbackself.filter = filter_conditiondef update(self, subject: Any) -> None:if self.filter(subject):self.callback(subject)# 使用示例
sensor = TemperatureSensor()def alert(temp: float):print(f"警报!当前温度过高: {temp}°C")# 只接收温度>30的通知
high_temp_observer = FilteredObserver(callback=alert,filter_condition=lambda s: s.temperature > 30
)sensor.attach(high_temp_observer)
sensor.temperature = 25  # 无输出
sensor.temperature = 35  # 输出: 警报!当前温度过高: 35°C

方案对比

实现方式特点适用场景
经典实现简单直接单线程简单场景
线程安全版避免竞态条件多线程环境
异步实现非阻塞通知I/O密集型应用
事件总线松耦合,支持多对多复杂事件系统
过滤观察者条件触发需要选择性通知的场景

最佳实践建议

  1. 生命周期管理

    # 使用上下文管理器自动取消注册
    class ObserverContext:def __init__(self, subject: Subject, observer: Observer):self.subject = subjectself.observer = observerdef __enter__(self):self.subject.attach(self.observer)return selfdef __exit__(self, *args):self.subject.detach(self.observer)with ObserverContext(sensor, display):sensor.temperature = 20
    
  2. 性能优化

    • 对于高频事件,考虑使用弱引用(weakref.WeakSet
    • 批量通知时使用@dataclass封装事件数据
  3. 异常处理

    def safe_notify(self):for observer in self._observers:try:observer.update(self)except Exception as e:print(f"Observer failed: {e}")
    
  4. 与Python生态集成

    • 使用PyPubSub等现成库
    • 结合asyncio.Queue实现生产者-消费者模式

根据项目复杂度选择合适实现,简单场景用经典模式即可,分布式系统建议使用事件总线架构。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/75706.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CST1019.基于Spring Boot+Vue智能洗车管理系统

计算机/JAVA毕业设计 【CST1019.基于Spring BootVue智能洗车管理系统】 【项目介绍】 智能洗车管理系统,基于 Spring Boot Vue 实现,功能丰富、界面精美 【业务模块】 系统共有三类用户,分别是:管理员用户、普通用户、工人用户&…

Windows上使用Qt搭建ARM开发环境

在 Windows 上使用 Qt 和 g++-arm-linux-gnueabihf 进行 ARM Linux 交叉编译(例如针对树莓派或嵌入式设备),需要配置 交叉编译工具链 和 Qt for ARM Linux。以下是详细步骤: 1. 安装工具链 方法 1:使用 MSYS2(推荐) MSYS2 提供 mingw-w64 的 ARM Linux 交叉编译工具链…

Python爬虫教程011:scrapy爬取当当网数据开启多条管道下载及下载多页数据

文章目录 3.6.4 开启多条管道下载3.6.5 下载多页数据3.6.6 完整项目下载3.6.4 开启多条管道下载 在pipelines.py中新建管道类(用来下载图书封面图片): # 多条管道开启 # 要在settings.py中开启管道 class DangdangDownloadPipeline:def process_item(self, item, spider):…

Mysql -- 基础

SQL SQL通用语法: SQL分类: DDL: 数据库操作 查询: SHOW DATABASES; 创建: CREATE DATABASE[IF NOT EXISTS] 数据库名 [DEFAULT CHARSET字符集] [COLLATE 排序规则]; 删除: DROP DATABA…

实操(环境变量)Linux

环境变量概念 我们用语言写的文件编好后变成了程序,./ 运行的时候他就会变成一个进程被操作系统调度并运行,运行完毕进程相关资源被释放,因为它是一个bash的子进程,所以它退出之后进入僵尸状态,bash回收他的退出结果&…

torch.cat和torch.stack的区别

torch.cat 和 torch.stack 是 PyTorch 中用于组合张量的两个常用函数,它们的核心区别在于输入张量的维度和输出张量的维度变化。以下是详细对比: 1. torch.cat (Concatenate) 作用:沿现有维度拼接多个张量,不创建新维度 输入要求…

深入解析@Validated注解:Spring 验证机制的核心工具

一、注解出处与核心定位 1. 注解来源 • 所属框架:Validated 是 Spring Framework 提供的注解(org.springframework.validation.annotation 包下)。 • 核心定位: 作为 Spring 对 JSR-380(Bean Validation 2.0&#…

2025年认证杯数学建模竞赛A题完整分析论文(含模型、可运行代码)(共32页)

2025年认证杯数学建模竞赛A题完整分析论文 目录 摘要 一、问题分析 二、问题重述 三、模型假设 四、 模型建立与求解 4.1问题1 4.1.1问题1解析 4.1.2问题1模型建立 4.1.3问题1样例代码(仅供参考) 4.1.4问题1求解结果分析&#xff08…

Google A2A协议,是为了战略性占领标准?

一、导读 2025 年 4 月 9 日,Google 正式发布了 Agent2Agent(A2A)协议。 A2A 协议致力于打破智能体之间的隔阂,让它们能够跨越框架和供应商的限制,以一种标准化、开放的方式进行沟通与协作 截止到现在,代…

Ansible:roles角色

文章目录 Roles角色Ansible Roles目录编排Roles各目录作用创建 roleplaybook调用角色调用角色方法1:调用角色方法2:调用角色方法3: roles 中 tags 使用实战案例 Roles角色 角色是ansible自1.2版本引入的新特性,用于层次性、结构化…

MCU的USB接口作为 USB CDC串口输出

引用: https://microchip-mplab-harmony.github.io/usb_apps_device/apps/usb_uart_bridge_dual/readme.html STM32 USB使用记录:使用CDC类虚拟串口(VCP)进行通讯_stm32 usb使用记录:使用cdc类虚拟串口(vcp)进行通讯-CSDN博客 前…

深度解析强化学习:原理、算法与实战

深度解析强化学习:原理、算法与实战 0. 前言1. 强化学习基础1.1 基本概念1.2 马尔科夫决策过程1.3 目标函数1.4 智能体学习过程2. 计算状态值3. 计算状态-动作值4. Q 学习4.1 Q 值4.2 使用 Q 学习进行 frozen lake 游戏4.3. frozen lake 问题4.4 实现 Q 学习小结系列链接0. 前…

UE5蓝图之间的通信------接口

一、创建蓝图接口 二、双击创建的蓝图接口,添加函数,并重命名新函数。 三、在一个蓝图(如玩家角色蓝图)中实现接口,如下图: 步骤一:点击类设置 步骤二:在细节面板已经实现的接口中…

2025 年“认证杯”数学中国数学建模网络挑战赛 A题 小行星轨迹预测

近地小行星( Near Earth Asteroids, NEAs )是轨道相对接近地球的小行 星,它的正式定义为椭圆轨道的近日距不大于 1.3 天文单位( AU )的小行星。 其中轨道与地球轨道最近距离小于 0.05A 且直径大于 140 米的小行星被…

Axure中继器(Repeater): 列表多选和 列表查询

文章目录 引言I 列表多选添加选中交互事件添加未选中交互事件II 列表查询知识点操作说明引言 基于鼠标点击交互事件实现列表多选列表查询 I 列表多选 添加选中交互事件 给列标题第一列多选框元件命名为ckeck,并同时添加选中交互事件; 同步添加设置选择/选中动作,目标元件选…

windows11下pytorch(cpu)安装

先装anaconda 见最下方 Pytorch 官网:PyTorch 找到下图(不要求版本一样)(我的电脑是集显(有navdia的装gpu),装cpu) 查看已有环境列表 创建环境 conda create –n 虚拟环境名字(…

最新版IDEA超详细图文安装教程(适用Mac系统)附安装包及补丁2025最新教程

目录 前言 一、IDEA最新版下载 二、IDEA安装 三、IDEA补丁 前言 IDEA(IntelliJ IDEA)是专为Java语言设计的集成开发环境(IDE),由JetBrains公司开发,被公认为业界最优秀的Java开发工具之一。DEA全称Int…

react从零开始的基础课1

全文约5万字。 1.hello,.. // App.jsx import { useState } from react import reactLogo from ./assets/react.svg import viteLogo from /vite.svg import ./App.cssfunction App() {const [count, setCount] useState(0)return (<><Greeting name"world&qu…

【linux知识】web服务环境搭建(一):用户以及开发环境初始化

toc 创建用户组以及用户 以下是 创建用户组 wendao 和用户 wendao 并指定 GID、UID 及家目录 的完整操作指南&#xff1a; 一、创建用户组&#xff08;指定 GID&#xff09; sudo groupadd -g 1500 wendao # 创建组并指定 GID 为 1500• 注意&#xff1a;GID 需唯一&#…

音视频 五 看书的笔记 MediaCodec

MediaCodec 用于访问底层媒体编解码器框架&#xff0c;编解码组件。通常与MediaExtractor(解封装,例如Mp4文件分解成 video和audio)、MediaSync、MediaMuxer(封装 例如音视频合成Mp4文件)、MediaCrypto、Image(cameraX 回调的ImageReader对象可以获取到Image帧图像,可转换成YU…