基于python-socket构建任务服务器(基于socket发送指令创建、停止任务)

在实现ia业务服务器时需要构建一个python-socket客户端,1、要求能与服务器保持心跳连接,每10秒钟发送一次心跳信号;2、要求能根据socket服务器发送的指令创建或终止一个定时任务。
为此以3个类实现该功能,分别为socket通信类(用于实现通信连接与任务创建)、任务池类(用于管理任务)、任务类(用于实现具体任务)。

1、socket通信客户端

这里定义的MySocket类主体结构如下图所示,共包含4个函数,2个线程(其本身继承Thread类实现主任务流程——run函数、接收服务器信息并创建任务添加到任务池 或者接收服务器返回的心跳数据;同时又在__init__函数中将self.thread_msg类封装为一个线程,每隔10秒钟向socket服务器发送一次心跳包)。check_connection函数用于检测socket是否与服务器断开连接,在send_msg函数中调用,当发现客户端掉线后则立刻进行重连。send_msg函数用于发送信息给服务器,因为run函数与thread_msg函数2个线程都需要调用连接与服务器发送数据,为避免冲突故而定义为函数在内部进行加锁。这里的关键点在于,有多个线程可以发送数据(thread_msg与run线程),但是只有一个线程可以接收数据(run函数),单一线程接收数据可以避免服务器发送的数据存在冲突(两个线程同时接收数据就会存在死锁)
在这里插入图片描述

#socket客户端
class MySocket(Thread):def __init__(self,config):super().__init__()# 1.创建套接字self.tcp_socket = socket(AF_INET,SOCK_STREAM)self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护# 2.准备连接服务器,建立连接self.serve_ip = config["serve_ip"]#当前"118.24.111,149"self.serve_port = config["serve_port"]  #端口当前7900self.sleep_time = config["sleep_time"]print("connect to : ",self.serve_ip,self.serve_port)self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式self.lock = threading.RLock()self.taskpool=TaskPool()task_msg=threading.Thread(target=self.thread_msg)task_msg.daemon = Truetask_msg.start()#定时发送信息def run(self):while True:a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1ka=a.decode('utf-8')if len(a)<66:#此时的数据为服务器返回的心跳数据continueprint("------主线程-----",a)jdata=json.loads(a)#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}task=OCRTask(jdata)self.taskpool.append(task)json_data={  "type":"OCR_STATE_ACK","timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"streamAddr": jdata["streamAddr"]}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)self.send_msg(data)def check_connection(self):try:self.tcp_socket.getpeername()return Trueexcept socket.error:return False#定时发送心跳信息def thread_msg(self):while True:#message=input('You can say:')#json标注的模板json_data={  "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"type":"HEARBEAT"}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)#进行定时发送self.send_msg(data)def send_msg(self,msg):if self.check_connection() is False:print('服务器掉线!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式try:#进行定时发送self.lock.acquire()self.tcp_socket.send(msg)self.lock.release()except ConnectionRefusedError:print('服务器拒绝本次连接!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except TimeoutError:print('连接超时!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except OSError:self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式print('智能终端无网络连接!!!!!')

2、任务池实现

任务池的实现代码如下所示,主要包含3个函数(其中将remove_task封装为一个子线程,用于实时移除已经完成计算任务的线程),append函数用于将新创建的任务添加大任务池pool中,stop函数用于停止并移除正在运行中的任务。
在这里插入图片描述
其具体实现代码如下所示,其作为MySocket类中的一个成员属性,每当MySocket接收到服务器信息创建任务ocrtask后都调用TaskPool.append(ocrtask)将任务添加到任务池中。由任务池管理任务的声明周期,具体可见其append函数可以启动task或终止task。remove_task线程会自动将已经完成的任务移除。

#ocr任务线程池
class TaskPool:def __init__(self,sleep_time=0.5):self.pool=[]self.sleep_time=sleep_timetask_msg=threading.Thread(target=self.remove_task)task_msg.daemon = Truetask_msg.start()#删除已经结束的任务def remove_task(self):while True:names=[]for task in self.pool:if task.get_count()==0: #生存时间为0,认为该任务已经完成需要被删除task.stop()self.pool.remove(task)else:names.append(task.taskname)if len(names)>0:print(names)time.sleep(self.sleep_time)def append(self,ocrtask):if ocrtask.state==0:#终止任务self.stop(ocrtask)else:#启动任务ocrtask.start()self.pool.append(ocrtask)#终止任务def stop(self,ocrtask):for task in self.pool:if task.taskname==ocrtask.taskname:task.stop()self.pool.remove(task)

3、具体任务线程

任务的实现代码如下所示,其支持3中任务模式,使用state区分任务,state为0-停止识别,1-连续识别count张,2-持续识别(故而在state为2时将count设置的特别大)。这里以count控制任务的运行,任务每运行一次count减少1。当count小于等于0,则表示任务运行完成。在TaskPool的remove_task中检测到count为0时则会自动删除任务。

#ocr任务
class OCRTask(Thread):def __init__(self,json):super().__init__()self.streamAddr=json["streamAddr"]self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别if json["state"]==2:self.count=9999999999999999999999999else:self.count=json["count"]if "taskname" in json.keys():self.taskname=json["taskname"]else:self.taskname=json["streamAddr"]self.jsonname=json["jsonname"]self.lock = threading.RLock()def run(self):while self.get_count()>0:print('run %s'%self.taskname,end='*')time.sleep(2)self.lock.acquire()self.count-=1self.lock.release()print('%s finish!! '%self.taskname)#获取任务的生存时间def get_count(self):self.lock.acquire()now_count=self.countself.lock.release()#削减countreturn now_count#停止任务def stop(self):self.lock.acquire()self.count=-1self.lock.release()#停止任务pass

4、完整代码与使用效果

完整代码如下所示

from socket import *
import time,json
import yaml
import threading,struct
from threading import Threaddef hex_to_bytes(hex_str):""":param hex_str: 16进制字符串:return: byte_data 字节流数据"""bytes_data = bytes()while hex_str :"""16进制字符串转换为字节流"""temp = hex_str[0:2]s = int(temp, 16)bytes_data += struct.pack('B', s)hex_str = hex_str[2:]return bytes_data# 读取Yaml文件方法
def read_yaml(yaml_path):with open(yaml_path, encoding="utf-8", mode="r") as f:result = yaml.load(stream=f,Loader=yaml.FullLoader)return result#ocr任务
class OCRTask(Thread):def __init__(self,json):super().__init__()self.streamAddr=json["streamAddr"]self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别if json["state"]==2:self.count=9999999999999999999999999else:self.count=json["count"]if "taskname" in json.keys():self.taskname=json["taskname"]else:self.taskname=json["streamAddr"]self.jsonname=json["jsonname"]self.lock = threading.RLock()def run(self):while self.get_count()>0:print('run %s'%self.taskname,end='*')time.sleep(2)self.lock.acquire()self.count-=1self.lock.release()print('%s finish!! '%self.taskname)#获取任务的生存时间def get_count(self):self.lock.acquire()now_count=self.countself.lock.release()#削减countreturn now_count#停止任务def stop(self):self.lock.acquire()self.count=-1self.lock.release()#停止任务pass#ocr任务线程池
class TaskPool:def __init__(self,sleep_time=0.5):self.pool=[]self.sleep_time=sleep_timetask_msg=threading.Thread(target=self.remove_task)task_msg.daemon = Truetask_msg.start()#删除已经结束的任务def remove_task(self):while True:names=[]for task in self.pool:if task.get_count()==0:task.stop()self.pool.remove(task)else:names.append(task.taskname)if len(names)>0:print(names)time.sleep(self.sleep_time)def append(self,ocrtask):if ocrtask.state==0:#终止任务self.stop(ocrtask)else:#启动任务ocrtask.start()self.pool.append(ocrtask)#终止任务def stop(self,ocrtask):for task in self.pool:if task.taskname==ocrtask.taskname:task.stop()self.pool.remove(task)#socket客户端
class MySocket(Thread):def __init__(self,config):super().__init__()# 1.创建套接字self.tcp_socket = socket(AF_INET,SOCK_STREAM)self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护# 2.准备连接服务器,建立连接self.serve_ip = config["serve_ip"]#当前"118.24.111,149"self.serve_port = config["serve_port"]  #端口当前7900self.sleep_time = config["sleep_time"]print("connect to : ",self.serve_ip,self.serve_port)self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式self.lock = threading.RLock()self.taskpool=TaskPool()task_msg=threading.Thread(target=self.thread_msg)task_msg.daemon = Truetask_msg.start()#定时发送信息#通信线程-用于接收服务器的指令def run(self):while True:a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1ka=a.decode('utf-8')if len(a)<66:#服务器返回的心跳包,不予处理continueprint("------主线程-----",a)jdata=json.loads(a)#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}task=OCRTask(jdata)self.taskpool.append(task)json_data={  "type":"OCR_STATE_ACK","timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"streamAddr": jdata["streamAddr"]}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)self.send_msg(data)#检测socket连接是否断开def check_connection(self):try:self.tcp_socket.getpeername()return Trueexcept socket.error:return False#定时发送心跳信息--子线程def thread_msg(self):while True:#message=input('You can say:')#json标注的模板json_data={  "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"type":"HEARBEAT"}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)#进行定时发送self.send_msg(data)#发送信息def send_msg(self,msg):if self.check_connection() is False:print('服务器掉线!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式try:#进行定时发送self.lock.acquire()self.tcp_socket.send(msg)self.lock.release()except ConnectionRefusedError:print('服务器拒绝本次连接!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except TimeoutError:print('连接超时!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except OSError:self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式print('智能终端无网络连接!!!!!')if "__main__"==__name__:#进行定时通信测试config=read_yaml("config.yaml")socket_client=MySocket(config)socket_client.start()

使用效果如下所示,这里基于socket调试工具作为客户端

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/706008.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

成功解决SyntaxError: unexpected character after line continuation character

成功解决SyntaxError: unexpected character after line continuation character &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程 &…

蓝桥杯Learning

Part 1 递归和递推 1. 简单斐波那契数列 n int(input())st [0]*(47) # 注意这个地方&#xff0c;需要将数组空间设置的大一些&#xff0c;否则会数组越界 st[1] 0 st[2] 1 # 这个方法相当于是递推&#xff0c;即先求解一个大问题的若干个小问题 def dfs(u):if u 1:print(…

Object中的hashCode()

让hashcode方法的返回值为地址 vm参数中输入-XX:UnlockExperimentalVMOptions -XX:hashCode4&#xff0c;如下图&#xff1a; 参考 搞懂JAVAObject中的hashCode()_java_脚本之家 JDK核心JAVA源码解析(9) - hashcode 方法 - 知乎

pop链构造 [NISACTF 2022]babyserialize

打开题目 题目源代码如下 <?php include "waf.php"; class NISA{public $fun"show_me_flag";public $txw4ever;public function __wakeup(){if($this->fun"show_me_flag"){hint();}}function __call($from,$val){$this->fun$val[0];…

【小沐学QT】QT学习之Web控件的使用

文章目录 1、简介1.1 Qt简介1.2 Qt下载和安装1.3 Qt快捷键1.4 Qt帮助 2、QtWeb控件2.1 测试代码1&#xff08;QApplication&#xff09;2.2 测试代码2&#xff08;QApplicationQWidget&#xff09;2.3 测试代码3&#xff08;QApplicationQMainWindow&#xff09;2.4 测试代码4&…

记录一下 Unity团结引擎开发OpenHarmony Next 应用 环境搭建流程

原视频链接 记录环境搭建过程~&#xff0c;本文是图文版本 一、打开团结引擎官网下载对应的 团结引擎版本 官网地址&#xff1a;https://unity.cn/tuanjie/releases 根据各自的开发环境下载对应的软件版本&#xff0c;我是 windwos 环境&#xff0c;我就下载 windows 环境 …

微服务-实用篇

微服务-实用篇 一、微服务治理1.微服务远程调用2.Eureka注册中心Eureka的作用&#xff1a;搭建EurekaServer服务Client服务注册服务发现Ribbon负载均衡策略配置Ribbon配置饥饿加载 3.nacos注册中心使用nacos注册中心服务nacos区域负载均衡nacos环境隔离-namespaceNacos和Eureka…

深度学习神经网络实战:多层感知机,手写数字识别

目的 利用tensorflow.js训练模型&#xff0c;搭建神经网络模型&#xff0c;完成手写数字识别 设计 简单三层神经网络 输入层 28*28个神经原&#xff0c;代表每一张手写数字图片的灰度隐藏层 100个神经原输出层 -10个神经原&#xff0c;分别代表10个数字 代码 // 导入 Ten…

负载均衡.

简介: 将请求/数据【均匀】分摊到多个操作单元上执行&#xff0c;负载均衡的关键在于【均匀】。 负载均衡的分类: 网络通信分类 四层负载均衡:基于 IP 地址和端口进行请求的转发。七层负载均衡:根据访问用户的 HTTP 请求头、URL 信息将请求转发到特定的主机。 载体维度分类 硬…

前端开发_Vue入门

Vue概念 Vue 是一个用于构建用户界面的渐进式框架 构建用户界面&#xff1a;基于数据渲染出用户看到的页面渐进式&#xff1a;循序渐进框架&#xff1a;一套完整的项目解决方案 创建Vue实例 准备容器 引包&#xff08;开发版本/生产版本&#xff09; <script src"h…

消息中间件篇之Kafka-数据清理机制

一、Kafka文件存储机制 Kafka文件存储结构&#xff1a;一个Topic有多个分区。每一个分区都有多个段&#xff0c;每个段都有三个文件。 为什么要分段&#xff1f;1. 删除无用文件方便&#xff0c;提高磁盘利用率。 2. 查找数据便捷。 二、数据清理机制 1.日志的清理策略方案1 根…

[C++][linux]Linux上内存共享内存用法

一&#xff0c;什么是共享内存 共享内存&#xff08;Shared Memory&#xff09;&#xff0c;指两个或多个进程共享一个给定的存储区。进程可以将同一段共享内存连接到它们自己的地址空间中&#xff0c;所有进程都可以访问共享内存中的地址&#xff0c;就好像它们是由用C语言函…

GEE入门篇|遥感专业术语(实践操作4):光谱分辨率(Spectral Resolution)

目录 光谱分辨率&#xff08;Spectral Resolution&#xff09; 1.MODIS 2.EO-1 光谱分辨率&#xff08;Spectral Resolution&#xff09; 光谱分辨率是指传感器进行测量的光谱带的数量和宽度。 您可以将光谱带的宽度视为每个波段的波长间隔&#xff0c;在多个波段测量辐射亮…

RestTemplate启动问题解决

⭐ 作者简介&#xff1a;码上言 ⭐ 代表教程&#xff1a;Spring Boot vue-element 开发个人博客项目实战教程 ⭐专栏内容&#xff1a;个人博客系统 ⭐我的文档网站&#xff1a;http://xyhwh-nav.cn/ RestTemplate启动问题解决 问题&#xff1a;在SpringCloud架构项目中配…

Java SpringBoot 整合 MyBatis 小案例

Java SpringBoot 整合 MyBatis 小案例 基础配置&#xff08;注意版本号&#xff0c;容易报错&#xff09; pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http…

TikTok东南亚小店爆单思路,怎么玩?

东南亚地区的跨境电商市场已经成为全球范围内最具吸引力的市场之一&#xff0c;在各个跨境电商平台上&#xff0c;都是转化率最高的站点之一。TikTok作为电商黑马&#xff0c;吸引了一大波跨境电商玩家入驻&#xff0c;其中东南亚小店也成为热门的选择&#xff0c;那么东南亚小…

当Vue项目启动后,通过IP地址方式在相同网络段的其他电脑上无法访问前端页面?

当Vue项目启动后&#xff0c;通过IP地址方式在相同网络段的其他电脑上无法访问前端页面&#xff0c;可能是由以下几个原因造成的&#xff1a; 服务监听地址&#xff1a;默认情况下&#xff0c;许多开发服务器&#xff08;如Vue CLI的vue-cli-service serve&#xff09;只监听lo…

ky10-server docker 离线安装包、离线安装

离线安装脚本 # ---------------离线安装docker------------------- rpm -Uvh --force --nodeps *.rpm# 修改docker拉取源为国内 rm -rf /etc/docker mkdir -p /etc/docker touch /etc/docker/daemon.json cat >/etc/docker/daemon.json<<EOF{"registry-mirro…

kubectl 命令行管理K8S(上)

目录 陈述式资源管理方式 介绍 命令 项目的生命周期 创建 kubectl create命令 发布 kubectl expose命令 更新 kubectl set 回滚 kubectl rollout 删除 kubectl delete 应用发布策略 金丝雀发布 陈述式资源管理方式 介绍 1.kubernetes 集群管理集群资源…

深圳市萨科微半导体有限公司一直研究新材料新工艺

深圳市萨科微&#xff08;www.slkoric.com&#xff09;半导体有限公司一直研究新材料新工艺&#xff0c;不断推出新产品&#xff0c;驱动公司不断发展。最近萨科微slkor推出SL40T120FL系列IGBT单管&#xff0c;和CMOS运算放大器SLA333等产品&#xff0c;为新能源汽车、太阳能光…