以下是使用 Python 操作 Redis 的完整整理,涵盖基础操作、高级功能及最佳实践:
1. 安装与连接
(1) 安装库
pip install redis
(2) 基础连接
import redis# 创建连接池(推荐复用连接)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, password='your_password', decode_responses=True # 自动解码为字符串
)
client = redis.Redis(connection_pool=pool)# 测试连接
print(client.ping()) # 输出 True 表示成功
(3) SSL 连接
client = redis.Redis(host='your-host',port=6380,ssl=True,ssl_certfile='/path/to/cert.crt',ssl_keyfile='/path/to/key.key',ssl_ca_certs='/path/to/ca.crt'
)
2. 基本数据类型操作
(1) 字符串(String)
# 设置值(带过期时间)
client.set('user:1001', 'Alice', ex=3600) # ex:秒级过期,px:毫秒级# 获取值
name = client.get('user:1001') # 返回字符串(若 decode_responses=True)# 自增/自减
client.incr('counter') # +1
client.incrby('counter', 5) # +5
client.decr('counter') # -1
(2) 哈希(Hash)
# 设置字段
client.hset('user:profile:1001', 'name', 'Alice')
client.hmset('user:profile:1001', {'age': 30, 'email': 'alice@example.com'})# 获取字段
name = client.hget('user:profile:1001', 'name') # 'Alice'
all_data = client.hgetall('user:profile:1001') # {'name': 'Alice', 'age': '30'}# 删除字段
client.hdel('user:profile:1001', 'email')
(3) 列表(List)
# 插入元素
client.lpush('tasks', 'task1') # 左侧插入
client.rpush('tasks', 'task2') # 右侧插入# 获取元素
task = client.rpop('tasks') # 右侧弹出 'task2'
all_tasks = client.lrange('tasks', 0, -1) # 获取全部元素# 截断列表
client.ltrim('tasks', 0, 2) # 保留前3个元素
(4) 集合(Set)
# 添加元素
client.sadd('tags', 'python', 'redis')# 集合运算
common_tags = client.sinter('tags1', 'tags2') # 交集
all_tags = client.sunion('tags1', 'tags2') # 并集# 随机弹出元素
random_tag = client.spop('tags')
(5) 有序集合(ZSet)
# 添加元素(带分数)
client.zadd('rank', {'Alice': 90, 'Bob': 85})# 获取排名
rank = client.zrevrank('rank', 'Alice') # 降序排名(0为最高)# 范围查询
top3 = client.zrevrange('rank', 0, 2, withscores=True) # 前3名及分数
3. 高级功能
(1) 管道(Pipeline)
# 批量操作(减少网络往返)
pipe = client.pipeline()
pipe.set('key1', 'value1')
pipe.incr('counter')
pipe.execute() # 返回 [True, 6]
(2) 事务(Transaction)
try:pipe = client.pipeline(transaction=True)pipe.multi() # 开启事务pipe.set('balance:1001', 100)pipe.decrby('balance:1001', 50)pipe.incrby('balance:1002', 50)pipe.execute() # 原子性提交
except redis.exceptions.WatchError:print("事务冲突,需重试")
(3) 分布式锁
def acquire_lock(lock_name, timeout=10):identifier = str(uuid.uuid4())end = time.time() + timeoutwhile time.time() < end:if client.set(lock_name, identifier, nx=True, ex=timeout):return identifiertime.sleep(0.001)return Falsedef release_lock(lock_name, identifier):script = """if redis.call('get', KEYS[1]) == ARGV[1] thenreturn redis.call('del', KEYS[1])elsereturn 0end"""return client.eval(script, 1, lock_name, identifier) == 1
(4) 发布订阅
# 发布消息
client.publish('news', 'Breaking News!')# 订阅频道
pubsub = client.pubsub()
pubsub.subscribe('news')for message in pubsub.listen():if message['type'] == 'message':print(f"收到消息: {message['data']}")
4. 数据序列化与复杂对象存储
(1) JSON 序列化
import jsonuser = {'id': 1001, 'name': 'Alice'}
client.set('user:1001', json.dumps(user))# 反序列化
data = client.get('user:1001')
if data:user_data = json.loads(data)
(2) 使用 MessagePack(更高效)
pip install msgpack
import msgpackuser_bytes = msgpack.packb(user)
client.set('user:1001', user_bytes)# 反序列化
data = client.get('user:1001')
if data:user_data = msgpack.unpackb(data)
5. 键管理与维护
(1) 扫描键(避免阻塞)
# 查找所有以 'cache:' 开头的键
for key in client.scan_iter(match='cache:*', count=1000):print(key)
(2) 设置过期时间
client.expire('user:1001', 3600) # 设置过期时间(秒)
client.pexpire('user:1001', 3600000) # 毫秒级过期
(3) 批量删除键
# 删除所有以 'temp:' 开头的键
keys = list(client.scan_iter(match='temp:*'))
if keys:client.delete(*keys)
6. 最佳实践
(1) 连接池管理
• 使用 ConnectionPool
复用连接,避免频繁创建/销毁。
• 配置最大连接数:
pool = redis.ConnectionPool(max_connections=10)
(2) 性能优化
• 使用 pipeline
批量操作减少网络延迟。
• 避免大 Key(如超过 1MB 的 String 或 Hash),拆分存储。
(3) 安全建议
• 禁用高危命令(在 Redis 配置中):
rename-command FLUSHDB ""
rename-command KEYS ""
• 使用低权限账号运行 Redis。
(4) 监控与调试
• 查看慢查询:
slow_logs = client.slowlog_get()
• 监控内存使用:
info = client.info('memory')
print(info['used_memory_human'])
7. 常用场景代码示例
(1) 缓存穿透解决方案
def get_user(user_id):key = f'user:{user_id}'data = client.get(key)if data is None:# 查询数据库user = db.query_user(user_id)if user:client.set(key, json.dumps(user), ex=300)else:# 缓存空值防止穿透client.set(key, 'NULL', ex=60)elif data == 'NULL':return Noneelse:return json.loads(data)
(2) 排行榜实现
# 更新分数
client.zadd('leaderboard', {'Alice': 100, 'Bob': 90})# 获取前10名
top10 = client.zrevrange('leaderboard', 0, 9, withscores=True)
总结
操作类型 | 核心方法 | 适用场景 |
---|---|---|
基础数据操作 | set /hset /lpush /sadd /zadd | 缓存、队列、标签、排行榜 |
事务与管道 | pipeline + multi | 批量操作、原子性执行 |
分布式锁 | SET + NX + Lua 脚本 | 资源竞争控制 |
发布订阅 | publish + subscribe | 实时消息通知 |
性能优化 | 连接池、Pipeline、异步删除 | 高并发、大数据量场景 |
通过合理选择数据结构和操作方式,Redis 能高效支撑缓存、队列、计数器、实时分析等多样化需求。建议结合业务场景灵活应用,并通过监控工具持续优化性能。