1.redis服务器与python编程环境
#install server
sudo apt update
sudo apt install redis-server
#install python api
pip install redis --timeout 200 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
1.1 测试代码
# 创建Redis客户端实例
r = redis.Redis(host='localhost', port=6379, db=0)# 设置键值对
r.set('key', 'value')# 获取值
value = r.get('key')
print(value) # 输出: b'value',b表示bytes类型# 删除键
r.delete('key')
2.接口封装1:命令行sh脚本的模式
使用redis缺省配置,就可以达到异常关机后,队列数据自动复原
2.1 enqueue.sh
#!/bin/bash
# usage: enqueue key1 value1 5
# key1, push value1, the queue max length is 5.
# 陈旧数据会被抛弃# 设置 Redis 连接信息
REDIS_CLI="redis-cli" # 替换为你的 redis-cli 路径,或者如果已在 PATH 中,直接使用 redis-cli
REDIS_HOST="localhost" # Redis 服务器主机名或 IP
REDIS_PORT="6379" # Redis 服务器端口号
KEY=$1 # 要设置的 Redis 键名
VALUE=$2 # 要设置的 Redis 值
MAX_LENGTH=$3 # 最大长度# 检查列表当前长度
current_length=$(redis-cli -h $REDIS_HOST -p $REDIS_PORT LLEN $KEY)# 如果列表长度小于最大长度,则添加新元素
if [ $current_length -gt $MAX_LENGTH ]; then$REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT RPOP $KEY
fi# 使用 redis-cli 执行 SET 命令
$REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT LPUSH $KEY $VALUE
2.2 fetch_top.sh
#!/bin/bash
#usage: fetch_top key1
#得到某个队列的最新值# 设置 Redis 连接信息
REDIS_CLI="redis-cli" # 替换为你的 redis-cli 路径,或者如果已在 PATH 中,直接使用 redis-cli
REDIS_HOST="localhost" # Redis 服务器主机名或 IP
REDIS_PORT="6379" # Redis 服务器端口号
KEY="$1" # Redis 列表键名# 使用 redis-cli 执行 LINDEX 命令获取列表的首个元素
FIRST_ELEMENT=$($REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT LINDEX $KEY 0)# 检查 LINDEX 命令执行结果
if [ -n "$FIRST_ELEMENT" ]; thenecho "$FIRST_ELEMENT"
elseecho ""
fi
2.3 dequeue.sh
#!/bin/bash
#将当前队列最新的元素丢弃# 设置 Redis 连接信息
REDIS_CLI="redis-cli" # 替换为你的 redis-cli 路径,或者如果已在 PATH 中,直接使用 redis-cli
REDIS_HOST="localhost" # Redis 服务器主机名或 IP
REDIS_PORT="6379" # Redis 服务器端口号
KEY="$1" # Redis 列表键名# 使用 redis-cli 执行 LINDEX 命令获取列表的首个元素
FIRST_ELEMENT=$($REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT LPOP $KEY)# 检查 LINDEX 命令执行结果
if [ -n "$FIRST_ELEMENT" ]; thenecho 0
elseecho 1
fi
3.接口封装2:python的格式,包含结构化对象的定义
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 获取当前脚本文件所在目录的父目录,并构建相对路径
import os
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
project_path = os.path.join(current_dir, '..')
sys.path.append(project_path)
sys.path.append(current_dir)
import datetime
import paho.mqtt.client as mqtt
import cat4Config
import struct
import subprocess
import time
import json
import queue
import numpy as np
import threading
import subprocess
import pickle
import redisclass GpPublishMsg:def __init__(self, time, ar_class, pic, addr_web_api):self.time = timeself.ar_class = ar_class,self.pic = picself.addr_web_api = addr_web_apiclass GpPublicMsgQueue:def __init__(self):self.MAX_QUEUE_LEN = 3600self.errCnt = 0self.keyname = current_dir.replace('/', ':').replace('\\', ':').replace('.', ':')def enqueue(self, msg:GpPublishMsg):try:# 创建Redis客户端实例r = redis.Redis(host='localhost', port=6379, db=0)size = r.llen(self.keyname)if(size>self.MAX_QUEUE_LEN):r.rpop(self.keyname)r.lpush(self.keyname, pickle.dumps(msg))except:self.errCnt = self.errCnt+1def dequeue(self) -> GpPublishMsg:try:# 创建Redis客户端实例r = redis.Redis(host='localhost', port=6379, db=0)br = r.lpop(self.keyname)if(br is not None):return pickle.loads(br)else:return Noneexcept:self.errCnt = self.errCnt+1return Nonedef fetch_queue_top(self) -> GpPublishMsg:try:# 创建Redis客户端实例r = redis.Redis(host='localhost', port=6379, db=0)size = r.llen(self.keyname)if(size==0):return Nonebr = r.lindex(self.keyname, 0)if(br is not None):return pickle.loads(br)else:return Noneexcept:self.errCnt = self.errCnt+1return Nonedef test(self):item1 = GpPublishMsg(2024, 'class1', 'pic1', "http://1")item2 = GpPublishMsg(2024, 'class1', 'pic2', "http://1")item3 = GpPublishMsg(2024, 'class1', 'pic3', "http://1")dumb = self.fetch_queue_top()print('before first enqueue, top() = ', dumb)self.enqueue(item1)print('enqueue item1, top() = ', self.fetch_queue_top().pic)self.enqueue(item2)print('enqueue item2, top() = ', self.fetch_queue_top().pic)self.dequeue()print('first dequeue, top() = ', self.fetch_queue_top().pic)self.enqueue(item3)print('enqueue(item3), top() = ', self.fetch_queue_top().pic)self.dequeue()print('dequeue(), top() = ', self.fetch_queue_top().pic)self.dequeue()print('dequeue(), top() = ', self.fetch_queue_top())
3.1 python封装测试
'''
>>> import ext_pic_out_offline_queue as gpqueue
>>> q = gpqueue.GpPublicMsgQueue()
>>> q.test()
before first enqueue, top() = None
enqueue item1, top() = pic1
enqueue item2, top() = pic2
first dequeue, top() = pic1
enqueue(item3), top() = pic3
dequeue(), top() = pic1
dequeue(), top() = None
'''def test(self):item1 = GpPublishMsg(2024, 'class1', 'pic1', "http://1")item2 = GpPublishMsg(2024, 'class1', 'pic2', "http://1")item3 = GpPublishMsg(2024, 'class1', 'pic3', "http://1")dumb = self.fetch_queue_top()print('before first enqueue, top() = ', dumb)self.enqueue(item1)print('enqueue item1, top() = ', self.fetch_queue_top().pic)self.enqueue(item2)print('enqueue item2, top() = ', self.fetch_queue_top().pic)self.dequeue()print('first dequeue, top() = ', self.fetch_queue_top().pic)self.enqueue(item3)print('enqueue(item3), top() = ', self.fetch_queue_top().pic)self.dequeue()print('dequeue(), top() = ', self.fetch_queue_top().pic)self.dequeue()print('dequeue(), top() = ', self.fetch_queue_top())
4.为什么要用redis
在编程模式上,订阅模式在分布式环境的一个实现形式,就是mqtt。它可以非常方便地处理消息分发。甚至可以将这个应用添加进嵌入式系统——这个分布式协同机制的开销极小。但是mqtt不负责持久化。顶多,它会保留同一个topic下的最后一笔数据,它不进行数据保存,只是把信息分发给当前在线的用户。
如果你需要一个存储缓冲的机制,那么redis就是非常适合的选择。
你可以手工实现,但是它有代价——你无法方便地调试,然后你会遇到重复发明轮子过程中必然遭遇的各种技术细节。