NTWORK 概述
基于 PC 企业微信的 api 接口,支持收发文本、群@、名片、图片、文件、视频、链接卡片等。
下载安装 ntwork
pip install ntwork
国内源安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple ntwork
企业微信版本下载
官方下载:https://dldir1.qq.com/wework/work_weixin/WeCom_4.0.8.6027.exe
简单基础案例
import sys
import ntworkwework = ntwork.WeWork()# 打开pc企业微信, smart: 是否管理已经登录的企业微信`在这里插入代码片`
wework.open(smart=True)# 等待登录
wework.wait_login()# 向文件助手发送一条消息
wework.send_text(conversation_id="FILEASSIST", content="hello, NtWork")try:while True:pass
except KeyboardInterrupt:ntwork.exit_()sys.exit()
获取联系人列表、群列表
# -*- coding: utf-8 -*-
import sys
import ntworkwework = ntwork.WeWork()# 打开pc企业微信, smart: 是否管理已经登录的微信
wework.open(smart=True)# 等待登录
wework.wait_login()# 获取内部(同事)列表并输出
contacts = wework.get_inner_contacts()
print("内部(同事)联系人列表: ")
print(contacts)# 获取外部(客户)列表并输出
contacts = wework.get_external_contacts()
print("外部(客户)联系人列表: ")
print(contacts)# 获取群列表并输出
rooms = wework.get_rooms()
print("群列表: ")
print(rooms)try:while True:pass
except KeyboardInterrupt:ntwork.exit_()sys.exit()
监听消息自动回复
import sys
import ntworkwework = ntwork.WeWork()# 打开pc企业微信, smart: 是否管理已经登录的微信
wework.open(smart=True)# 注册消息回调
@wework.msg_register(ntwork.MT_RECV_TEXT_MSG)
def on_recv_text_msg(wework_instance: ntwork.WeWork, message):data = message["data"]sender_user_id = data["sender"]self_user_id = wework_instance.get_login_info()["user_id"]conversation_id: str = data["conversation_id"]# 判断消息不是自己发的并且不是群消息时,回复对方if sender_user_id != self_user_id and not conversation_id.startswith("R:"):wework_instance.send_text(conversation_id=conversation_id, content=f"你发送的消息是: {data['content']}")try:while True:pass
except KeyboardInterrupt:ntwork.exit_()sys.exit()
使用 fastapi 框架
通过 fastapi 的 swagger 在线文档可以很方便的管理 ntwork 接口:案例
常见问题解决方案
版本不匹配异常
如果出现ntwork.exception.WeWorkVersionNotMatchError异常
异常, 请确认是否安装github上指定的企业微信版本,如果确认已经安装,还是报错,可以在代码中添加以下代码,跳过微信版本检测
import ntwork
ntwork.set_wework_exe_path(wework_version='4.0.8.6027')
账号克隆登录
新建一些 ntwork.wework 实例,然后调用 open 方法。
import ntwork# 多开3个微信
for i in range(3):wework = ntwork.WeWork()wework.open(smart=False)
更完善的多实例管理查看 fastapi_example例子
如何监听输出所有的消息
# 注册监听所有消息回调
import ntwork@wework.msg_register(ntwork.MT_ALL)
def on_recv_text_msg(wework_instance: ntwork.WeWork, message):print("########################")print(message)
完全例子查看examples/msg_register_all.py
如何关闭 ntwork 日志
os.environ['NTWORK_LOG'] = "ERROR"
要在import ntwork
前执行
# -*- coding: utf-8 -*-
import os
import sys
import time
os.environ['NTWORK_LOG'] = "ERROR"import ntwork
如何关闭 cmd 窗口
先使用pip install pywin32
安装pywin32模块, 然后在代码中添加以下代码, 完整例子查看examples/cmd_close_event.py
import sys
import ntwork
import win32apidef on_exit(sig, func=None):ntwork.exit_()sys.exit()# 当关闭cmd窗口时
win32api.SetConsoleCtrlHandler(on_exit, True)
Pyinstaller 打包 exe
使用 pyinstaller 打包 ntwork 项目,需要添加 --collect-data=ntwork
选项
打包成单个 exe 程序
pyinstaller -F --collect-data=ntwork main.py
将所有的依赖文件打包到一个目录中
pyinstaller -y --collect-data=ntwork main.py
打包 fastapi_example 示例,需要添加 --paths=. --collect-data=ntchat
pyinstaller -F --paths=. --collect-data=ntchat main.py
登录日志预览
2024-04-12 00:45:21,949 - WeWorkManager - INFO - initialize wework, version: 4.0.8.6027
2024-04-12 00:45:21,949 - WeWorkManager - DEBUG - new wework instance
2024-04-12 00:45:21,982 - WeWorkInstance - INFO - open wework pid: 57024
2024-04-12 00:45:23,671 - WeWorkManager - DEBUG - accept client_id: 1
2024-04-12 00:45:26,225 - WeWorkInstance - DEBUG - on recv message: {'data': {'account': '', 'acctid': 'WangPaiGeGe', 'avatar': 'https://wework.qpic.cn/wwpic3az/457956_UJkq5TJkTfCnA5x_1712341231/0', 'corp_id': '1970325052533916', 'document_root': 'C:\\Users\\Administrator\\Documents\\WXWork\\1688856738534700', 'email': '', 'job_name': '', 'mobile': '19561983930', 'nickname': '', 'pid': 57024, 'position': '', 'sex': 1, 'user_id': '1688856738534700', 'username': '王牌哥哥'}, 'type': 11026}
2024-04-12 00:45:26,241 - WeWorkInstance - INFO - login success, name: 王牌哥哥
私聊自动回复案例
import sys
import ntworkwework = ntwork.WeWork()# 打开 pc 企业微信, smart: 是否管理已经登录的微信
wework.open(smart=True)# 注册消息回调
@wework.msg_register(ntwork.MT_RECV_TEXT_MSG)
def on_recv_text_msg(wework_instance: ntwork.WeWork, message):data = message["data"]sender_user_id = data["sender"]self_user_id = wework_instance.get_login_info()["user_id"]conversation_id: str = data["conversation_id"]# 判断消息不是自己发的并且不是群消息时,回复对方if sender_user_id != self_user_id and not conversation_id.startswith("R:"):wework_instance.send_text(conversation_id=conversation_id, content=f"你发送的消息是: {data['content']}")try:while True:pass
except KeyboardInterrupt:ntwork.exit_()sys.exit()
消息属性
私聊消息属性说明
{'appinfo': 'CAQQiZrgsAYYpMv0mZmAgAMgyIKKUw==','at_list': [],'content': '我喜欢你','content_type': 0,'conversation_id': 'S:1688856625489316_1688856738534700','is_pc': 0,'local_id': '34','receiver': '1688856738534700','send_time': '1712852233','sender': '1688856625489316','sender_name': '唤醒手腕','server_id': '1000235'
}
群聊消息属性说明
{'appinfo': 'CAQQiKTgsAYYpMv0mZmAgAMg99ak/Ak=','at_list': [],'content': '我喜欢你','content_type': 0,'conversation_id': 'R:1970325052533916','is_pc': 0,'local_id': '41','receiver': '1688856738534700','send_time': '1712853512','sender': '1688856625489316','sender_name': '唤醒手腕','server_id': '1000258'
}
案例:故障离线表格统计
读取故障数据源码
import re# 假设这是你的消息体字符串
test_message = """
【故障位置】化州西收费站97车道
【故障情况】不能缴费
【填报人】唤醒手腕
"""def analysis_repair_data(message):# 定义正则表达式模式,用于匹配每个字段pattern = r"【故障位置】(.*)\n【故障情况】(.*)\n【填报人】(.*)"# 使用re.search来查找匹配项match = re.search(pattern, message, re.DOTALL)if match:fault_location = match.group(1).strip() # 提取故障位置fault_situation = match.group(2).strip() # 提取故障情况reporter = match.group(3).strip() # 提取填报人return {"fault_location": fault_location,"fault_situation": fault_situation,"reporter": reporter}else:return False
读取维修数据源码
import re# 假设这是你的消息体字符串
test_message = """
【故障编号】
【维修情况】已修复
【维修人】xxx
"""def analysis_solve_data(message):# 定义正则表达式模式,用于匹配每个字段pattern = r""" 【故障编号】(.*) 【维修情况】(.*) 【维修人】(.*) """pattern = re.compile(pattern, re.DOTALL | re.VERBOSE)# 使用re.search来查找匹配项match = pattern.search(message)if match:serial = match.group(1).strip() # 提取故障编号repair_situation = match.group(2).strip() # 提取维修情况repair_person = match.group(3).strip() # 提取维修人return {"serial": serial,"repair_situation": repair_situation,"repair_person": repair_person}else:return False
更新离线 Excel 表格源码
from datetime import datetimefrom openpyxl.reader.excel import load_workbook# 加载工作簿
wb = load_workbook(filename='故障维修登记表.xlsx')
# 选择活动工作表或者通过名字选择工作表
ws = wb.active # 使用活动工作表def insert_new_note(repair_data):# 获取行数row_count = ws.max_rownow = datetime.now()# 使用f-string格式化时间formatted_time = f"{now:%Y%m%d-%H%M%S}"data = {"index": row_count,"serial": formatted_time,"fault_location": repair_data['fault_location'],"fault_situation": repair_data['fault_situation'],"reporter": repair_data['reporter'],"repair_situation": "","repair_person": ""}row_num = row_count + 1ws.cell(row=row_num, column=1).value = data['index']ws.cell(row=row_num, column=2).value = data['serial']ws.cell(row=row_num, column=3).value = data['fault_location']ws.cell(row=row_num, column=4).value = data['fault_situation']ws.cell(row=row_num, column=5).value = data['reporter']ws.cell(row=row_num, column=6).value = data['repair_situation']ws.cell(row=row_num, column=7).value = data['repair_person']# 保存工作簿到文件wb.save("故障维修登记表.xlsx")return data['serial']def finish_old_note(solve_data):target_name = solve_data['serial']# 遍历所有行,查找目标值state = Falsefor index, row in enumerate(ws.iter_rows(values_only=True)):if row[1] == "编号":continueif row[1] == target_name: # 假设"Name"在第一列# 找到目标行后,修改该行的数据# 这里假设我们要修改的是第二列的数据,将其改为"New Value"ws.cell(row=(index + 1), column=6).value = solve_data['repair_situation']ws.cell(row=(index + 1), column=7).value = solve_data['repair_person']state = Truewb.save("故障维修登记表.xlsx")break # 找到后就退出循环if state:return solve_data['serial']else:return False
主程序 app.py 源码
# -*- coding: utf-8 -*-
import sys
from datetime import datetimeimport ntworkfrom read_repair_data import analysis_repair_data
from read_solve_data import analysis_solve_data
from update_excel import insert_new_note, finish_old_notewework = ntwork.WeWork()# 打开 pc 企业微信, smart: 是否管理已经登录的微信
wework.open(smart=True)# 注册消息回调
@wework.msg_register(ntwork.MT_RECV_TEXT_MSG)
def on_recv_text_msg(wework_instance: ntwork.WeWork, message):data = message["data"]if data['content'][0:1] != "【":returnsender_user_id = data["sender"]self_user_id = wework_instance.get_login_info()["user_id"]conversation_id: str = data["conversation_id"]# 判断消息不是自己发的并且不是群消息时,回复对方if sender_user_id != self_user_id and conversation_id.startswith("R:"):insert_data = analysis_repair_data(data['content'])if insert_data:serial = insert_new_note(insert_data)now = datetime.now()# 使用f-string格式化时间formatted_time = f"{now:%Y-%m-%d %H:%M:%S}"print(f"【反馈结果】【{formatted_time}】请注意存在新的反馈:{serial}")wework_instance.send_text(conversation_id=conversation_id, content=f"【反馈结果】反馈成功\n【故障编号】{serial}")insert_data = analysis_solve_data(data['content'])if insert_data:serial = finish_old_note(insert_data)if serial:# 使用f-string格式化时间now = datetime.now()formatted_time = f"{now:%Y-%m-%d %H:%M:%S}"print(f"【维修结果】【{formatted_time}】请注意存在新的维修:{serial}")wework_instance.send_text(conversation_id=conversation_id, content=f"【维修结果】维修成功\n【故障编号】{str(serial)}")else:wework_instance.send_text(conversation_id=conversation_id, content=f"【操作警告】故障编号不存在!")try:while True:pass
except KeyboardInterrupt:ntwork.exit_()sys.exit()