aioredis 在高版本已经不支持了, 不要用
代码示例
- redis 连接池
- 异步操作redis以及接口
import asyncio
from sanic import Sanic
from sanic.response import json
import redis.asyncio as redis
from redis.asyncio import ConnectionPool# 创建 Sanic 应用
app = Sanic("RedisSanicApp")# 初始化 Redis 连接池
pool = ConnectionPool(host='localhost', port=6379, db=0)
redis_client = redis.StrictRedis(connection_pool=pool)@app.route("/set_redis/<key>/<value>")
async def set_redis(request, key, value):try:# 使用连接池异步设置键值对await redis_client.set(key, value)return json({"status": "OK", "message": f"Data saved: {key} = {value}"})except Exception as e:return json({"status": "error", "message": str(e)})@app.route("/get_redis/<key>")
async def get_redis(request, key):try:# 使用连接池异步获取数据value = await redis_client.get(key)if value:return json({"status": "OK", "key": key, "value": value.decode()})else:return json({"status": "not_found", "message": f"Key '{key}' not found"})except Exception as e:return json({"status": "error", "message": str(e)})if __name__ == "__main__":app.run(host="0.0.0.0", port=8000)
测试接口
存数据到 Redis:
curl http://localhost:8000/set_redis/my_key/my_value
这将会在 Redis 中存入键为 my_key,值为 my_value 的数据。
从 Redis 获取数据:
curl http://localhost:8000/get_redis/my_key
这将返回 Redis 中 my_key 对应的值,应该是 my_value。
异步redis pipeline
from sanic import Sanic
from sanic.response import json
import redis.asyncio as redis# 创建 Sanic 应用实例
app = Sanic("async_redis_pipeline_example")# 创建 Redis 客户端
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)@app.route("/pipeline_set/<key>/<value>")
async def pipeline_set(request, key, value):# 使用 pipeline 批量执行多个 Redis 命令async with redis_client.pipeline() as pipe:# 将多个命令添加到 pipelineawait pipe.set(key, value)await pipe.get(key)# 执行所有命令result = await pipe.execute()# 获取执行结果set_result = result[0] # set 命令的返回结果get_result = result[1] # get 命令的返回结果return json({"set_result": set_result, "key": key, "value": get_result.decode('utf-8')})@app.route("/pipeline_incr/<key>")
async def pipeline_incr(request, key):# 使用 pipeline 执行多个增量操作async with redis_client.pipeline() as pipe:# 预先增加多个值await pipe.incr(key)await pipe.incr(key)await pipe.incr(key)# 执行所有命令result = await pipe.execute()# 返回最后结果return json({"key": key, "value": result[-1]})if __name__ == "__main__":app.run(host="0.0.0.0", port=8000)
注意
虽然可以手动管理 pipeline,
# 全局定义的 pipeline
pipeline = redis_client.pipeline()
资源管理问题:全局的 pipeline 是共享的,如果不同的请求同时使用它,可能会导致并发问题,因为 Pipeline 并不是线程安全的。
如果你在一个请求期间同时修改和执行管道的命令,可能会导致错误。
但最好使用
async with redis_client.pipeline() as pipe
来确保管道在每次请求处理后都能正确关闭,避免因手动管理造成的资源泄漏或并发问题。这样做更安全、简洁。