Python异步Redis客户端与通用缓存装饰器

前言

这里我将通过 redis-py 简易封装一个异步的Redis客户端,然后主要讲解设计一个支持各种缓存代理(本地内存、Redis等)的缓存装饰器,用于在减少一些不必要的计算、存储层的查询、网络IO等。

具体代码都封装在 HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com) 中,以大家便捷使用。

异步redis客户端

首先安装 redis-py 库

pip install redis

Redis 之前是不支持异步的,后面为了统一异步redis操作与python常用的redis.py 的api接口一致,aioredis的作者已经将 aioredis 加入了redis中维护,安装的版本大于 4.2.0rc1 就行。

  • aioredis:https://github.com/aio-libs-abandoned/aioredis-py
  • redis:https://github.com/redis/redis-py

BaseRedisManager 封装

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { redis连接处理模块 }
# @Date: 2023/05/03 21:13
from datetime import timedelta
from typing import Optional, Unionfrom redis import Redis
from redis import asyncio as aioredisfrom py_tools import constants
from py_tools.decorators.cache import CacheMeta, cache_json, RedisCacheProxy, AsyncRedisCacheProxyclass BaseRedisManager:"""Redis客户端管理器"""client: Union[Redis, aioredis.Redis] = Nonecache_key_prefix = constants.CACHE_KEY_PREFIX@classmethoddef init_redis_client(cls,async_client: bool = False,host: str = "localhost",port: int = 6379,db: int = 0,password: Optional[str] = None,max_connections: Optional[int] = None,**kwargs):"""初始化 Redis 客户端。Args:async_client (bool): 是否使用异步客户端,默认为 False(同步客户端)host (str): Redis 服务器的主机名,默认为 'localhost'port (int): Redis 服务器的端口,默认为 6379db (int): 要连接的数据库编号,默认为 0password (Optional[str]): 密码可选max_connections (Optional[int]): 最大连接数。默认为 None(不限制连接数)**kwargs: 传递给 Redis 客户端的其他参数Returns:None"""if cls.client is None:redis_client_cls = Redisif async_client:redis_client_cls = aioredis.Rediscls.client = redis_client_cls(host=host, port=port, db=db, password=password, max_connections=max_connections, **kwargs)return cls.client@classmethoddef cache_json(cls,ttl: Union[int, timedelta] = 60,key_prefix: str = None,):"""缓存装饰器(仅支持缓存能够json序列化的数据)缓存函数整体结果Args:ttl: 过期时间 默认60skey_prefix: 默认的key前缀, 再未指定key时使用Returns:"""key_prefix = key_prefix or cls.cache_key_prefixif isinstance(ttl, timedelta):ttl = int(ttl.total_seconds())cache_proxy = RedisCacheProxy(cls.client)if isinstance(cls.client, aioredis.Redis):cache_proxy = AsyncRedisCacheProxy(cls.client)return cache_json(cache_proxy=cache_proxy, key_prefix=key_prefix, ttl=ttl)

还是跟之前封装客户端一样的简易封装,由类属性 client 维护真正操作的redis的客户端,通过 init_redis_client 方法进行初始化。这样封装的目的就是在系统中只初始化一份 redis 客户端,操作时可以直接使用类方法。BaseRedisManager 只实现一些通用的 redis 操作(有待挖掘),具体还是需要业务Manager来继承封装业务中操作redis的方法。目前只实现了一个redis的缓存装饰器,其实内部就是组织参数设置redis代理,然后调用另外一个通用的缓存装饰器,这样使用的时候不需要制定缓存代理了。

缓存装饰器

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { 缓存装饰器模块 }
# @Date: 2023/05/03 19:23
import asyncio
import functools
import hashlib
import json
from datetime import timedeltaimport cacheout
from redis import Redis
from redis import asyncio as aioredis
from pydantic import BaseModel, Field
from typing import Union
from py_tools import constantsMEMORY_PROXY = MemoryCacheProxy(cache_client=cacheout.Cache(maxsize=1024))def cache_json(cache_proxy: BaseCacheProxy = MEMORY_PROXY,key_prefix: str = constants.CACHE_KEY_PREFIX,ttl: Union[int, timedelta] = 60,
):"""缓存装饰器(仅支持缓存能够json序列化的数据)Args:cache_proxy: 缓存代理客户端, 默认系统内存ttl: 过期时间 默认60skey_prefix: 默认的key前缀Returns:"""key_prefix = f"{key_prefix}:cache_json"if isinstance(ttl, timedelta):ttl = int(ttl.total_seconds())def _cache(func):def _gen_key(*args, **kwargs):"""生成缓存的key"""# 根据函数信息与参数生成# key => 函数所在模块:函数名:函数位置参数:函数关键字参数 进行hashparam_args_str = ",".join([str(arg) for arg in args])param_kwargs_str = ",".join(sorted([f"{k}:{v}" for k, v in kwargs.items()]))hash_str = f"{func.__module__}:{func.__name__}:{param_args_str}:{param_kwargs_str}"hash_ret = hashlib.sha256(hash_str.encode()).hexdigest()# 根据哈希结果生成key 默认前缀:函数所在模块:函数名:hashhash_key = f"{key_prefix}:{func.__module__}:{func.__name__}:{hash_ret}"return hash_key@functools.wraps(func)def sync_wrapper(*args, **kwargs):"""同步处理"""# 生成缓存的keyhash_key = _gen_key(*args, **kwargs)# 先从缓存获取数据cache_data = cache_proxy.get(hash_key)if cache_data:# 有直接返回print(f"命中缓存: {hash_key}")return json.loads(cache_data)# 没有,执行函数获取结果ret = func(*args, **kwargs)# 缓存结果cache_proxy.set(key=hash_key, value=json.dumps(ret), ttl=ttl)return ret@functools.wraps(func)async def async_wrapper(*args, **kwargs):"""异步处理"""# 生成缓存的keyhash_key = _gen_key(*args, **kwargs)# 先从缓存获取数据cache_data = await cache_proxy.get(hash_key)if cache_data:# 有直接返回return json.loads(cache_data)# 没有,执行函数获取结果ret = await func(*args, **kwargs)# 缓存结果await cache_proxy.set(key=hash_key, value=json.dumps(ret), ttl=ttl)return retreturn async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapperreturn _cache

cache_json 是一个带单参数的缓存装饰器,可以指定一些缓存的代理、缓存key前缀、缓存ttl等。

内部实现了同步、异步函数的缓存处理,关键点其实就是如何构造唯一的缓存key,这里就是根据key前缀与函数的一些签名信息来构造的。

def _gen_key(*args, **kwargs):"""生成缓存的key"""# 没有传递key信息,根据函数信息与参数生成# key => 函数所在模块:函数名:函数位置参数:函数关键字参数 进行hashparam_args_str = ",".join([str(arg) for arg in args])param_kwargs_str = ",".join(sorted([f"{k}:{v}" for k, v in kwargs.items()]))hash_str = f"{func.__module__}:{func.__name__}:{param_args_str}:{param_kwargs_str}"hash_ret = hashlib.sha256(hash_str.encode()).hexdigest()# 根据哈希结果生成key 默认前缀:函数所在模块:函数名:hashhash_key = f"{key_prefix}:{func.__module__}:{func.__name__}:{hash_ret}"return hash_key

函数所在模块:函数名:函数位置参数:函数关键字参数 进行hash,在处理关键字参数的需要排个序,来保证相同的参数,顺序不同但缓存key一致。后面的逻辑就是常见的设置缓存操作。

image.png

缓存代理类

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { 缓存装饰器模块 }
# @Date: 2023/05/03 19:23
import asyncio
import functools
import hashlib
import json
from datetime import timedeltaimport cacheout
from redis import Redis
from redis import asyncio as aioredis
from pydantic import BaseModel, Field
from typing import Union
from py_tools import constantsclass CacheMeta(BaseModel):"""缓存元信息"""key: str = Field(description="缓存的key")ttl: Union[int, timedelta] = Field(description="缓存有效期")cache_client: str = Field(description="缓存的客户端(Redis、Memcached等)")data_type: str = Field(description="缓存的数据类型(str、list、hash、set)")class BaseCacheProxy(object):"""缓存代理基类"""def __init__(self, cache_client):self.cache_client = cache_client  # 具体的缓存客户端,例如Redis、Memcached等def set(self, key: str, value: str, ttl: int):raise NotImplementeddef get(self, key):cache_data = self.cache_client.get(key)return cache_dataclass RedisCacheProxy(BaseCacheProxy):"""同步redis缓存代理"""def __init__(self, cache_client: Redis):super().__init__(cache_client)def set(self, key, value, ttl):self.cache_client.setex(name=key, value=value, time=ttl)class AsyncRedisCacheProxy(BaseCacheProxy):"""异步Redis缓存代理"""def __init__(self, cache_client: aioredis.Redis):super().__init__(cache_client)async def set(self, key, value, ttl):await self.cache_client.setex(name=key, value=value, time=ttl)async def get(self, key):cache_data = await self.cache_client.get(key)return cache_dataclass MemoryCacheProxy(BaseCacheProxy):"""系统内存缓存代理"""def __init__(self, cache_client: cacheout.Cache):super().__init__(cache_client)def set(self, key, value, ttl):self.cache_client.set(key=key, value=value, ttl=ttl)MEMORY_PROXY = MemoryCacheProxy(cache_client=cacheout.Cache(maxsize=1024))

这里设置一个缓存代理抽象类是用于封装屏蔽不同缓存客户端的操作不一致性。统一成如下入口

def set(self, key: str, value: str, ttl: int):raise NotImplementeddef get(self, key):cache_data = self.cache_client.get(key)return cache_data

让具体的缓存客户端重写(实现)这两个方法,以达到缓存装饰器的通用性。目前只实现了同步、异步redis缓存代理以及通过 cacheout 库实现的本地内存缓存代理,后面接入其他的缓存代理(例如Memcached等)就不用动cache_json函数了,只要继承 BaseCacheProxy,实现具体的 set、get 操作即可。

pip install python-memcached
import memcacheclass MemcacheCacheProxy(BaseCacheProxy):def __init__(self, cache_client: memcache.Client):super().__init__(cache_client)def set(self, key, value, ttl):self.cache_client.set(key, value, time=ttl)

由于获取缓存的方法逻辑一致,故而直接复用就行。

测试Demo

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @File: cache.py
# @Desc: { cache demo 模块 }
# @Date: 2024/04/23 11:11
import asyncio
import time
from datetime import timedeltaimport cacheoutfrom py_tools.connections.db.redis_client import BaseRedisManager
from py_tools.decorators.cache import cache_json, MemoryCacheProxy, RedisCacheProxy, AsyncRedisCacheProxyclass RedisManager(BaseRedisManager):client = Noneclass AsyncRedisManager(BaseRedisManager):client = NoneRedisManager.init_redis_client(async_client=False)
AsyncRedisManager.init_redis_client(async_client=True)memory_proxy = MemoryCacheProxy(cache_client=cacheout.Cache())
redis_proxy = RedisCacheProxy(cache_client=RedisManager.client)
aredis_proxy = AsyncRedisCacheProxy(cache_client=AsyncRedisManager.client)@cache_json(key_prefix="demo", ttl=3)
def memory_cache_demo_func(name: str, age: int):return {"test_memory_cache": "hui-test", "name": name, "age": age}@cache_json(cache_proxy=redis_proxy, ttl=10)
def redis_cache_demo_func(name: str, age: int):return {"test_redis_cache": "hui-test", "name": name, "age": age}@cache_json(cache_proxy=aredis_proxy, ttl=timedelta(minutes=1))
async def aredis_cache_demo_func(name: str, age: int):return {"test_async_redis_cache": "hui-test", "name": name, "age": age}@AsyncRedisManager.cache_json(ttl=30)
async def aredis_manager_cache_demo_func(name: str, age: int):return {"test_async_redis_manager_cache": "hui-test", "name": name, "age": age}def memory_cache_demo():print("memory_cache_demo")ret1 = memory_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = memory_cache_demo_func(name="hui", age=18)print("ret2", ret2)print()time.sleep(3)ret3 = memory_cache_demo_func(age=18, name="hui")print("ret3", ret3)print()assert ret1 == ret2 == ret3def redis_cache_demo():print("redis_cache_demo")ret1 = redis_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = redis_cache_demo_func(name="hui", age=18)print("ret2", ret2)assert ret1 == ret2async def aredis_cache_demo():print("aredis_cache_demo")ret1 = await aredis_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = await aredis_cache_demo_func(name="hui", age=18)print("ret2", ret2)assert ret1 == ret2async def aredis_manager_cache_demo():print("aredis_manager_cache_demo")ret1 = await aredis_manager_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = await aredis_manager_cache_demo_func(name="hui", age=18)print("ret2", ret2)assert ret1 == ret2async def main():memory_cache_demo()redis_cache_demo()await aredis_cache_demo()await aredis_manager_cache_demo()if __name__ == '__main__':asyncio.run(main())

输出结果

memory_cache_demo
ret1 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}命中缓存: demo:cache_json:__main__:memory_cache_demo_func:46c6a618a88eb5067a00915c10c97c6c72d5073ecf9b04060433de75b2d21f51
ret2 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}ret3 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}redis_cache_demo
ret1 {'test_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}命中缓存: py-tools:cache_json:__main__:redis_cache_demo_func:a00b13aa2e1e56ad328d1956bc3c3fb8e89b7007453a780e866cc3ccafb51d73
ret2 {'test_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}
aredis_cache_demo
ret1 {'test_async_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}ret2 {'test_async_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}
aredis_manager_cache_demo
ret1 {'test_async_redis_manager_cache': 'hui-test', 'name': 'hui', 'age': 18}ret2 {'test_async_redis_manager_cache': 'hui-test', 'name': 'hui', 'age': 18}

Redis 缓存情况

缓存信息还是挺清晰的就是有点长。由于是从主入口调用的函数,所以 func.__module__ 是 __main__。

这缓存装饰器一般适用于一些参数相同, 结果经常不变的情况下,以及允许短时间内出现数据不一致的场景。如下是一些典型的应用场景

  1. API Token缓存:对于需要使用API Token进行身份验证的API请求,通常API Token具有一定的有效期。在这种情况下,你可以使用缓存装饰器来缓存API Token,以避免在每次请求时重新生成或从后端服务获取。这样可以降低对后端服务的负载,并提高系统的响应速度。
  2. OSS Sign URL缓存:当需要生成签名URL来访问对象存储服务(如AWS S3、阿里云OSS)中的资源时,通常需要对URL进行签名以确保安全性。在这种情况下,你可以使用缓存装饰器来缓存已签名的URL,在一定时间内重复使用相同的签名URL,而不必重新计算签名。这样可以降低对签名计算资源的消耗,并减少重复的签名请求。
  3. 频繁查询的数据缓存:对于一些数据不经常变化但是频繁被查询的情况,比如一些静态配置信息、全局参数等,可以使用缓存装饰器将查询结果缓存起来,减少数据库查询次数,提高系统的响应速度。
  4. 外部API响应结果缓存:当你调用外部API获取数据时,有时这些数据在一段时间内不会发生变化。在这种情况下,你可以使用缓存装饰器来缓存外部API的响应结果,以避免频繁地向外部API发出请求。这不仅可以提高系统的性能,还可以降低对外部服务的依赖性。

总的来说,缓存装饰器可以应用于许多场景,特别是在需要提高性能、减少资源消耗和避免重复请求数据的情况下。通过合理地设置缓存时间,可以权衡数据的新鲜度和系统性能,从而实现更好的用户体验。

源代码

源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。

HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/5858.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【PHP】安装指定版本Composer

1、下载指定版本composer.phar文件:https://github.com/composer/composer/releases 2、将下载的文件添加到全局路径: sudo mv composer.phar /usr/local/bin/composer 3、赋予权限: sudo chmod x /usr/local/bin/composer 4、查看compos…

Linux进程——进程的创建(fork的原理)

前言:在上一篇文章中,我们已经会使用getpid/getppid函数来查看pid和ppid,本篇文章会介绍第二种查看进程的方法,以及如何创建子进程! 本篇主要内容: 查看进程的第二种方法创建子进程系统调用函数fork 在开始前&#xff…

一文了解双向链表

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、链表分类二、双向链表是什么?三、功能函数实现1.申请一个节点2.初始化3.尾插4.头插5.尾删6.头删7.在指定位置后插入8.删除指定位置数据9.查找10…

带环链表问题

带环链表就是字面意思带环的链表,例如以下这三种情况 练习题 1.给定一个链表,判断链表中是否带环. - 力扣(LeetCode) 思路:快慢指针,慢指针走一步,快指针走两步,两个指针从链表的起…

nginx的前世今生(二)

书接上回: 上回书说到,nginx的前世今生,这回我们继续说 3.缓冲秘籍,洪流控水 Nginx的缓冲区是其处理数据传输和提高性能的关键设计之一,主要用于暂存和管理进出的数据流,以应对不同组件间速度不匹配的问题…

池化整合多元数据库,zData X 一体机助力证券公司IT基础架构革新

引言 近期,云和恩墨 zData X 多元数据库一体机(以下简称 zData X)在某证券公司的OA、短信和CRM业务系统中成功上线,标志着其IT基础架构完成从集中式存储向池化高性能分布式存储的转变。zData X 成功整合了该证券公司使用的达梦、O…

Windows php 安装 Memcached扩展、php缺失 Memcached扩展、Class ‘Memcached‘ not found

在Windows系统下如何安装 php Memcached 扩展 下载dll文件 pecl地址:https://pecl.php.net/package/memcached 根据版本进行选择 : 解压下载的文件后得到了这么样的文件结构: 配置 移动dll文件到相应文件位置 重点: libme…

FreeRTOS队列集(1-15)

队列集定义:def 队列集只允许任务间传递消息为同一种数据类型,如果需要在任务间传递不同数据类型的消息时,就可以使用队列集。 用于对多个信号量进行监听,其中不管哪一个消息到来,都可以让任务退出阻塞状态 假设&am…

如何利用MCU自动测量单元提高大坝安全监测效率

大坝作为重要的水利基础设施,其安全性直接关系到人民群众的生命财产安全和社会的稳定发展。因此,对大坝进行实时、准确的安全监测至关重要。近年来,随着微控制器单元(MCU)技术的不断发展,其在大坝安全监测领域的应用也越来越广泛。…

【PCL】教程 supervoxel_clustering执行超体聚类并可视化点云数据及其聚类结果

[done, 417.125 ms : 307200 points] Available dimensions: x y z rgba 源点云milk_cartoon_all_small_clorox.pcd > Loading point cloud... > Extracting supervoxels! Found 423 supervoxels > Getting supervoxel adjacency 这段代码主要是使用PCL(Po…

【Linux】创建/扩容swap交换空间swap优化

一、当前交换空间大小 目前交换空间大小为2G 二、创建swap交换空间 #创建大小为2G的交换空间 [roothadoop01 data1]# dd if/dev/zero of/data1/swapfile bs1M count2048 #将文件设置为交换空间 [roothadoop01 data1]# mkswap /data1/swapfile #启用交换空间 [roothadoop01 da…

Java Web 开发 - 掌握拦截器和监听器

目录 深入了解Java Web的拦截器和监听器 拦截器(Interceptor) 拦截器的使用场景 拦截器实例 思维导图 ​编辑 监听器(Listener) 监听器的使用场景 监听器类型 监听器实例 思维导图​编辑 总结 深入了解Java Web的拦截器…

在UI界面中播放视频_unity基础开发教程

在UI界面中播放视频_unity基础开发教程 前言操作步骤结语 前言 之前我写过一篇在场景中播放视频的文章,但是在开发中有时候也会在UI的界面中播放视频,这期我们做一下在UI的界面中播放视频。 操作步骤 首先在场景中创建一个Raw Image,UI->…

0418EmpTomCat项目 初次使用ajax实现局部动态离职

0418EmpTomCat项目包-CSDN博客 数据库字段: 员工部门表 分页查询; 多条件查询; 添加新员工; ajax点击离职操作效果:

【CTF Web】BUUCTF BUU CODE REVIEW 1 Writeup(代码审计+PHP弱类型漏洞+MD5的0e绕过+反序列化)

BUU CODE REVIEW 1 1 https://github.com/glzjin/buusec_2019_code_review_1 解法 <?php /*** Created by PhpStorm.* User: jinzhao* Date: 2019/10/6* Time: 8:04 PM*/highlight_file(__FILE__);class BUU {public $correct "";public $input ""…

动态数据结构中的表扩张性:摊还分析、伪代码与C语言实现

动态数据结构中的表扩张性&#xff1a;摊还分析、伪代码与C语言实现 引言表扩张性的概念摊还分析在表扩张性中的应用伪代码示例&#xff1a;TABLE-INSERT操作C语言实现结论 引言 在处理数据结构时&#xff0c;尤其是表&#xff08;或数组&#xff09;&#xff0c;我们经常面临…

Idea报错:无法访问org.springframework.boot.SpringApplication

在开发项目时&#xff0c;常常会遇到这种问题&#xff0c;报错信息如下图所示 版本号与jdk版本号存在对应关系&#xff0c;61.0对应jdk17&#xff0c;52.0对应jdk8 所以是某个依赖的版本太高&#xff0c;降低该依赖的版本即可 具体步骤&#xff1a; ①修改pom.xml中spring b…

【linuxC语言】exec函数族

文章目录 前言一、exec函数族二、示例代码2.1 代码12.2 代码22.3 代码3 总结 前言 在Linux环境下&#xff0c;C语言提供了一组强大的函数族&#xff0c;即exec函数族&#xff0c;用于执行其他程序。这些函数允许程序在运行时加载并执行不同的程序&#xff0c;从而实现了程序之…

使用docker部署nacos2.2.3单节点

docker部署nacos2.2.3 首先nacos要配合mysql进行初始化数据&#xff0c;部署一个mysql5.7版本的。 systemctl stop firewalld && setenforce 0 关闭防火墙和selinuxdocker pull mysql:5.7 && docker pull nacos/nacos-server:v2.2.3 拉取镜像docker …

redis中的集群模式

主从复制、主从同步(解决高并发读的问题) 主从同步原理&#xff1a; 1.全量同步 slave&#xff08;从节点&#xff09;每次请求数据同步会带两个参数&#xff1a;replid和offset。 replid&#xff1a;第一次请求同步时&#xff0c;replid和master的replid不一样&#xff0c;这…