遇到问题加QQ群聊 群主在线解答 点击加入群聊【星辰开发】
Redis
pip install redis
def get_conn():from redis import Redisfrom configparser import ConfigParserconfig = ConfigParser()config.read('Config.ini')Redis_Host = config['Redis']['Host']Redis_Port = config['Redis']['Port']Redis_pwd = config['Redis']['pwd']Redis_DB = config['Redis']['DB']Redis_Conn = Redis(host=Redis_Host, port=int(Redis_Port), password=Redis_pwd, db=int(Redis_DB))return Redis_Conndef get_key(key):from json import loadsRedis_Conn = get_conn()if key == "*":return Redis_Conn.keys()elif Redis_Conn.exists(key):data = loads(Redis_Conn.get(key))time = Redis_Conn.ttl(key)return data, timereturn False
def keys(key):Redis_Conn = get_conn()return Redis_Conn.keys(key)def Set(key, data, expire=None):from json import dumpsRedis_Conn = get_conn()Redis_Conn.set(key, dumps(data))if expire != None:Redis_Conn.expire(key, expire)def exists(key):key_exists = get_conn().exists(key)return key_existsdef delete(key):get_conn().delete(key)
OneBotAPI
# 拼接NTQQ的URL
def joint_URL(self_id):from configparser import ConfigParserconfig = ConfigParser()config.read('config.ini')QQNT_Host = config['QQNT']['Host']QQNT_Port = config['QQNT']['Port']# 返回 http://127.0.0.1:5001return f"http://{QQNT_Host}:{QQNT_Port}"# 获取消息内容
def get_message(self_id, message_id):import requestsdata = {"message_id": message_id}return requests.get(f"{joint_URL(self_id)}/get_msg", params=data).json()# 发送群消息
def send_group_msg(self_id, group, message, auto_escape=False):import requestsdata = {"group_id": group,"message": message,"auto_escape": auto_escape,"rate_limit_interval": 100000}requests.get(f"{joint_URL(self_id)}/send_group_msg", params=data)# 发送私聊
def send_private_msg(self_id, user_id, message, auto_escape=False):import requestsdata = {"group_id": user_id,"message": message,"auto_escape": auto_escape}requests.get(f"{joint_URL(self_id)}/send_private_msg", params=data)# 群聊禁言
def set_group_ban(self_id, group_id, user_id, duration=0):import requestsdata = {"group_id": group_id,"user_id": user_id,"duration": duration}requests.get(f"{joint_URL(self_id)}/set_group_ban", params=data)# 群聊踢人
def set_group_kick(self_id, group_id, user_id, reject_add_request=False):import requestsdata = {"group_id": group_id,"user_id": user_id,"reject_add_request": reject_add_request}requests.get(f"{joint_URL(self_id)}/set_group_kick", params=data)# 撤回信息
def delete_msg(self_id, message_id):import requestsdata = {"message_id": message_id}requests.get(f"{joint_URL(self_id)}/delete_msg", params=data)# 设置群组专属头衔
def set_group_special_title(self_id, group_id, user_id, special_title):import requestsdata = {"group_id": group_id,"user_id": user_id,"special_title": special_title,}requests.get(f"{joint_URL(self_id)}/set_group_special_title", params=data)# 获取 QQ 相关接口凭证
def get_credentials(self_id, domain):import requestsdata = {"domain": domain,}return requests.get(f"{joint_URL(self_id)}/get_credentials", params=data)# 获取 CSRF Token
def get_csrf_token(self_id):import requestsreturn requests.get(f"{joint_URL(self_id)}/get_csrf_token")
MySQL
def get_conn():from mysql import connectorfrom configparser import ConfigParserconfig = ConfigParser()config.read('config.ini')MySQL_Host = config['MySQL']['Host']MySQL_Port = config['MySQL']['Port']MySQL_User = config['MySQL']['User']MySQL_Password = config['MySQL']['Password']MySQL_Database = config['MySQL']['Database']return connector.connect(user=MySQL_User, password=MySQL_Password, host=MySQL_Host, database=MySQL_Database)# 数据库查询操作
def Select(sql):conn = get_conn()mycursor = conn.cursor()try:conn.ping(reconnect=True)mycursor.execute(sql) # Get the column namesmyresult = mycursor.fetchall()columns = [i[0] for i in mycursor.description]myresult = [dict(zip(columns, row)) for row in myresult]return myresultexcept Exception as e:print(e)return 0finally:mycursor.close()# 数据库更新操作
def Update(sql):from threading import Lockconn = get_conn()lock = Lock()lock.acquire() # 获取锁以确保线程安全try:mycursor = conn.cursor()conn.ping(reconnect=True)mycursor.execute(sql)conn.commit()except Exception as e:print(e)finally:mycursor.close() # 关闭游标conn.close() # 关闭数据库连接lock.release() # 释放锁