经观察,推拉转任务在两三天时间内就失效了。
1 用脚本每天定时启动一次 wvp docker 容器, 并关闭所有推拉转任务,建议每天凌晨2点运行
import subprocess
import time
import socket
import requestsdef restart_container(container_name):subprocess.run(["docker", "restart", container_name], check=True)def wait_for_port(host, port, timeout=120):start_time = time.time()while time.time() - start_time < timeout:with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:result = sock.connect_ex((host, port))if result == 0:return Truetime.sleep(1)return Falsecontainer_name = "wvp-server"
port = 48080
host = "localhost" # 修改为服务所在的主机名或 IP 地址# 重启 Docker 容器
restart_container(container_name)# 等待服务端口启动
if wait_for_port(host, port):print(f"服务 {container_name} 在端口 {port} 上已启动成功")
else:print(f"服务 {container_name} 在端口 {port} 启动失败")time.sleep(60)# 目标 URL
url = 'http://127.0.0.1:48080/api/user/login?username=admin&password=21232f297a57a5a743894a0e4a801fc3' # 替换为实际的 URL# 发起 GET 请求
response = requests.get(url)headers = {}
# 检查响应状态码
if response.status_code == 200:# 成功处理响应数据data = response.json() # 如果响应内容是 JSON 格式headers['Access-Token'] = data['data']['accessToken']print("请求成功:", data)
else:print("请求失败,状态码:", response.status_code)# 发起 GET 请求,并添加请求头
response = requests.get('http://127.0.0.1:48080/api/proxy/list?page=1&count=100', headers=headers)task_list = []
# 检查响应状态码
if response.status_code == 200:# 成功处理响应数据data = response.json() # 如果响应内容是 JSON 格式task_list = data['data']['list']print("请求成功:", data)
else:print("请求失败,状态码:", response.status_code)for task in task_list:if task.get('enable', False): print(f"任务 {task['name']} 已启用") response = requests.get(f"http://127.0.0.1:48080/api/proxy/stop?app={task['app']}&stream={task['stream']}", headers=headers) data = response.json()print("请求关闭结果---> :", data)else: print(f"任务 {task['name']} 未启用")
2 用脚本定时启动未启用的推拉转任务(防止在启动容器时有的流没打开),建议五分钟执行一次
import subprocess
import time
import socket
import requests# 目标 URL
url = 'http://127.0.0.1:48080/api/user/login?username=admin&password=21232f297a57a5a743894a0e4a801fc3' # 替换为实际的 URL# 发起 GET 请求
response = requests.get(url)headers = {}
# 检查响应状态码
if response.status_code == 200:# 成功处理响应数据data = response.json() # 如果响应内容是 JSON 格式headers['Access-Token'] = data['data']['accessToken']print("请求成功:", data)
else:print("请求失败,状态码:", response.status_code)# 发起 GET 请求,并添加请求头
response = requests.get('http://127.0.0.1:48080/api/proxy/list?page=1&count=100', headers=headers)task_list = []
# 检查响应状态码
if response.status_code == 200:# 成功处理响应数据data = response.json() # 如果响应内容是 JSON 格式task_list = data['data']['list']print("请求成功:", data)
else:print("请求失败,状态码:", response.status_code)for task in task_list:if task.get('enable', False): print(f"任务 {task['name']} 已启用") else: print(f"任务 {task['name']} 未启用")response = requests.get(f"http://127.0.0.1:48080/api/proxy/start?app={task['app']}&stream={task['stream']}", headers=headers) data = response.json()print("请求打开结果---> :", data)