基于python-socket构建任务服务器(基于socket发送指令创建、停止任务)

在实现ia业务服务器时需要构建一个python-socket客户端,1、要求能与服务器保持心跳连接,每10秒钟发送一次心跳信号;2、要求能根据socket服务器发送的指令创建或终止一个定时任务。
为此以3个类实现该功能,分别为socket通信类(用于实现通信连接与任务创建)、任务池类(用于管理任务)、任务类(用于实现具体任务)。

1、socket通信客户端

这里定义的MySocket类主体结构如下图所示,共包含4个函数,2个线程(其本身继承Thread类实现主任务流程——run函数、接收服务器信息并创建任务添加到任务池 或者接收服务器返回的心跳数据;同时又在__init__函数中将self.thread_msg类封装为一个线程,每隔10秒钟向socket服务器发送一次心跳包)。check_connection函数用于检测socket是否与服务器断开连接,在send_msg函数中调用,当发现客户端掉线后则立刻进行重连。send_msg函数用于发送信息给服务器,因为run函数与thread_msg函数2个线程都需要调用连接与服务器发送数据,为避免冲突故而定义为函数在内部进行加锁。这里的关键点在于,有多个线程可以发送数据(thread_msg与run线程),但是只有一个线程可以接收数据(run函数),单一线程接收数据可以避免服务器发送的数据存在冲突(两个线程同时接收数据就会存在死锁)
在这里插入图片描述

#socket客户端
class MySocket(Thread):def __init__(self,config):super().__init__()# 1.创建套接字self.tcp_socket = socket(AF_INET,SOCK_STREAM)self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护# 2.准备连接服务器,建立连接self.serve_ip = config["serve_ip"]#当前"118.24.111,149"self.serve_port = config["serve_port"]  #端口当前7900self.sleep_time = config["sleep_time"]print("connect to : ",self.serve_ip,self.serve_port)self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式self.lock = threading.RLock()self.taskpool=TaskPool()task_msg=threading.Thread(target=self.thread_msg)task_msg.daemon = Truetask_msg.start()#定时发送信息def run(self):while True:a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1ka=a.decode('utf-8')if len(a)<66:#此时的数据为服务器返回的心跳数据continueprint("------主线程-----",a)jdata=json.loads(a)#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}task=OCRTask(jdata)self.taskpool.append(task)json_data={  "type":"OCR_STATE_ACK","timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"streamAddr": jdata["streamAddr"]}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)self.send_msg(data)def check_connection(self):try:self.tcp_socket.getpeername()return Trueexcept socket.error:return False#定时发送心跳信息def thread_msg(self):while True:#message=input('You can say:')#json标注的模板json_data={  "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"type":"HEARBEAT"}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)#进行定时发送self.send_msg(data)def send_msg(self,msg):if self.check_connection() is False:print('服务器掉线!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式try:#进行定时发送self.lock.acquire()self.tcp_socket.send(msg)self.lock.release()except ConnectionRefusedError:print('服务器拒绝本次连接!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except TimeoutError:print('连接超时!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except OSError:self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式print('智能终端无网络连接!!!!!')

2、任务池实现

任务池的实现代码如下所示,主要包含3个函数(其中将remove_task封装为一个子线程,用于实时移除已经完成计算任务的线程),append函数用于将新创建的任务添加大任务池pool中,stop函数用于停止并移除正在运行中的任务。
在这里插入图片描述
其具体实现代码如下所示,其作为MySocket类中的一个成员属性,每当MySocket接收到服务器信息创建任务ocrtask后都调用TaskPool.append(ocrtask)将任务添加到任务池中。由任务池管理任务的声明周期,具体可见其append函数可以启动task或终止task。remove_task线程会自动将已经完成的任务移除。

#ocr任务线程池
class TaskPool:def __init__(self,sleep_time=0.5):self.pool=[]self.sleep_time=sleep_timetask_msg=threading.Thread(target=self.remove_task)task_msg.daemon = Truetask_msg.start()#删除已经结束的任务def remove_task(self):while True:names=[]for task in self.pool:if task.get_count()==0: #生存时间为0,认为该任务已经完成需要被删除task.stop()self.pool.remove(task)else:names.append(task.taskname)if len(names)>0:print(names)time.sleep(self.sleep_time)def append(self,ocrtask):if ocrtask.state==0:#终止任务self.stop(ocrtask)else:#启动任务ocrtask.start()self.pool.append(ocrtask)#终止任务def stop(self,ocrtask):for task in self.pool:if task.taskname==ocrtask.taskname:task.stop()self.pool.remove(task)

3、具体任务线程

任务的实现代码如下所示,其支持3中任务模式,使用state区分任务,state为0-停止识别,1-连续识别count张,2-持续识别(故而在state为2时将count设置的特别大)。这里以count控制任务的运行,任务每运行一次count减少1。当count小于等于0,则表示任务运行完成。在TaskPool的remove_task中检测到count为0时则会自动删除任务。

#ocr任务
class OCRTask(Thread):def __init__(self,json):super().__init__()self.streamAddr=json["streamAddr"]self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别if json["state"]==2:self.count=9999999999999999999999999else:self.count=json["count"]if "taskname" in json.keys():self.taskname=json["taskname"]else:self.taskname=json["streamAddr"]self.jsonname=json["jsonname"]self.lock = threading.RLock()def run(self):while self.get_count()>0:print('run %s'%self.taskname,end='*')time.sleep(2)self.lock.acquire()self.count-=1self.lock.release()print('%s finish!! '%self.taskname)#获取任务的生存时间def get_count(self):self.lock.acquire()now_count=self.countself.lock.release()#削减countreturn now_count#停止任务def stop(self):self.lock.acquire()self.count=-1self.lock.release()#停止任务pass

4、完整代码与使用效果

完整代码如下所示

from socket import *
import time,json
import yaml
import threading,struct
from threading import Threaddef hex_to_bytes(hex_str):""":param hex_str: 16进制字符串:return: byte_data 字节流数据"""bytes_data = bytes()while hex_str :"""16进制字符串转换为字节流"""temp = hex_str[0:2]s = int(temp, 16)bytes_data += struct.pack('B', s)hex_str = hex_str[2:]return bytes_data# 读取Yaml文件方法
def read_yaml(yaml_path):with open(yaml_path, encoding="utf-8", mode="r") as f:result = yaml.load(stream=f,Loader=yaml.FullLoader)return result#ocr任务
class OCRTask(Thread):def __init__(self,json):super().__init__()self.streamAddr=json["streamAddr"]self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别if json["state"]==2:self.count=9999999999999999999999999else:self.count=json["count"]if "taskname" in json.keys():self.taskname=json["taskname"]else:self.taskname=json["streamAddr"]self.jsonname=json["jsonname"]self.lock = threading.RLock()def run(self):while self.get_count()>0:print('run %s'%self.taskname,end='*')time.sleep(2)self.lock.acquire()self.count-=1self.lock.release()print('%s finish!! '%self.taskname)#获取任务的生存时间def get_count(self):self.lock.acquire()now_count=self.countself.lock.release()#削减countreturn now_count#停止任务def stop(self):self.lock.acquire()self.count=-1self.lock.release()#停止任务pass#ocr任务线程池
class TaskPool:def __init__(self,sleep_time=0.5):self.pool=[]self.sleep_time=sleep_timetask_msg=threading.Thread(target=self.remove_task)task_msg.daemon = Truetask_msg.start()#删除已经结束的任务def remove_task(self):while True:names=[]for task in self.pool:if task.get_count()==0:task.stop()self.pool.remove(task)else:names.append(task.taskname)if len(names)>0:print(names)time.sleep(self.sleep_time)def append(self,ocrtask):if ocrtask.state==0:#终止任务self.stop(ocrtask)else:#启动任务ocrtask.start()self.pool.append(ocrtask)#终止任务def stop(self,ocrtask):for task in self.pool:if task.taskname==ocrtask.taskname:task.stop()self.pool.remove(task)#socket客户端
class MySocket(Thread):def __init__(self,config):super().__init__()# 1.创建套接字self.tcp_socket = socket(AF_INET,SOCK_STREAM)self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护# 2.准备连接服务器,建立连接self.serve_ip = config["serve_ip"]#当前"118.24.111,149"self.serve_port = config["serve_port"]  #端口当前7900self.sleep_time = config["sleep_time"]print("connect to : ",self.serve_ip,self.serve_port)self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式self.lock = threading.RLock()self.taskpool=TaskPool()task_msg=threading.Thread(target=self.thread_msg)task_msg.daemon = Truetask_msg.start()#定时发送信息#通信线程-用于接收服务器的指令def run(self):while True:a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1ka=a.decode('utf-8')if len(a)<66:#服务器返回的心跳包,不予处理continueprint("------主线程-----",a)jdata=json.loads(a)#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}task=OCRTask(jdata)self.taskpool.append(task)json_data={  "type":"OCR_STATE_ACK","timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"streamAddr": jdata["streamAddr"]}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)self.send_msg(data)#检测socket连接是否断开def check_connection(self):try:self.tcp_socket.getpeername()return Trueexcept socket.error:return False#定时发送心跳信息--子线程def thread_msg(self):while True:#message=input('You can say:')#json标注的模板json_data={  "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"type":"HEARBEAT"}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)#进行定时发送self.send_msg(data)#发送信息def send_msg(self,msg):if self.check_connection() is False:print('服务器掉线!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式try:#进行定时发送self.lock.acquire()self.tcp_socket.send(msg)self.lock.release()except ConnectionRefusedError:print('服务器拒绝本次连接!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except TimeoutError:print('连接超时!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except OSError:self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式print('智能终端无网络连接!!!!!')if "__main__"==__name__:#进行定时通信测试config=read_yaml("config.yaml")socket_client=MySocket(config)socket_client.start()

使用效果如下所示,这里基于socket调试工具作为客户端

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/706008.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Visual Studio Code(VSCode)软件相关(安装、用法、工具等)

1. MacOS使用code .命令行快速打开VScode https://blog.csdn.net/weixin_45345234/article/details/135072918 2. vscode 提示编写代码导入 使用TAB键导入

linux系统中gitlab的备份与恢复和邮件配置

gitlab的备份恢复和邮箱配置 Gitlab备份与恢复数据备份数据恢复 邮箱配置 Gitlab备份与恢复 数据备份 vim /etc/gitlab/gitlab.rb #gitlab的配置文件gitlab_rails[backup_path] "/var/opt/gitlab/backups" #备份默认数据目录如果修改了配置文件 gitlab-ctl…

成功解决SyntaxError: unexpected character after line continuation character

成功解决SyntaxError: unexpected character after line continuation character &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程 &…

蓝桥杯Learning

Part 1 递归和递推 1. 简单斐波那契数列 n int(input())st [0]*(47) # 注意这个地方&#xff0c;需要将数组空间设置的大一些&#xff0c;否则会数组越界 st[1] 0 st[2] 1 # 这个方法相当于是递推&#xff0c;即先求解一个大问题的若干个小问题 def dfs(u):if u 1:print(…

Object中的hashCode()

让hashcode方法的返回值为地址 vm参数中输入-XX:UnlockExperimentalVMOptions -XX:hashCode4&#xff0c;如下图&#xff1a; 参考 搞懂JAVAObject中的hashCode()_java_脚本之家 JDK核心JAVA源码解析(9) - hashcode 方法 - 知乎

pop链构造 [NISACTF 2022]babyserialize

打开题目 题目源代码如下 <?php include "waf.php"; class NISA{public $fun"show_me_flag";public $txw4ever;public function __wakeup(){if($this->fun"show_me_flag"){hint();}}function __call($from,$val){$this->fun$val[0];…

【小沐学QT】QT学习之Web控件的使用

文章目录 1、简介1.1 Qt简介1.2 Qt下载和安装1.3 Qt快捷键1.4 Qt帮助 2、QtWeb控件2.1 测试代码1&#xff08;QApplication&#xff09;2.2 测试代码2&#xff08;QApplicationQWidget&#xff09;2.3 测试代码3&#xff08;QApplicationQMainWindow&#xff09;2.4 测试代码4&…

记录一下 Unity团结引擎开发OpenHarmony Next 应用 环境搭建流程

原视频链接 记录环境搭建过程~&#xff0c;本文是图文版本 一、打开团结引擎官网下载对应的 团结引擎版本 官网地址&#xff1a;https://unity.cn/tuanjie/releases 根据各自的开发环境下载对应的软件版本&#xff0c;我是 windwos 环境&#xff0c;我就下载 windows 环境 …

微服务-实用篇

微服务-实用篇 一、微服务治理1.微服务远程调用2.Eureka注册中心Eureka的作用&#xff1a;搭建EurekaServer服务Client服务注册服务发现Ribbon负载均衡策略配置Ribbon配置饥饿加载 3.nacos注册中心使用nacos注册中心服务nacos区域负载均衡nacos环境隔离-namespaceNacos和Eureka…

深度学习神经网络实战:多层感知机,手写数字识别

目的 利用tensorflow.js训练模型&#xff0c;搭建神经网络模型&#xff0c;完成手写数字识别 设计 简单三层神经网络 输入层 28*28个神经原&#xff0c;代表每一张手写数字图片的灰度隐藏层 100个神经原输出层 -10个神经原&#xff0c;分别代表10个数字 代码 // 导入 Ten…

负载均衡.

简介: 将请求/数据【均匀】分摊到多个操作单元上执行&#xff0c;负载均衡的关键在于【均匀】。 负载均衡的分类: 网络通信分类 四层负载均衡:基于 IP 地址和端口进行请求的转发。七层负载均衡:根据访问用户的 HTTP 请求头、URL 信息将请求转发到特定的主机。 载体维度分类 硬…

前端开发_Vue入门

Vue概念 Vue 是一个用于构建用户界面的渐进式框架 构建用户界面&#xff1a;基于数据渲染出用户看到的页面渐进式&#xff1a;循序渐进框架&#xff1a;一套完整的项目解决方案 创建Vue实例 准备容器 引包&#xff08;开发版本/生产版本&#xff09; <script src"h…

消息中间件篇之Kafka-数据清理机制

一、Kafka文件存储机制 Kafka文件存储结构&#xff1a;一个Topic有多个分区。每一个分区都有多个段&#xff0c;每个段都有三个文件。 为什么要分段&#xff1f;1. 删除无用文件方便&#xff0c;提高磁盘利用率。 2. 查找数据便捷。 二、数据清理机制 1.日志的清理策略方案1 根…

学习面向对象

面向对象 概念 现实生活&#xff1a; 类&#xff1a;抽象的概念&#xff0c;把具有相同特征和操作的事物归为一类 先有实体&#xff0c;再有类的概念 代码世界&#xff1a; 类&#xff1a;抽象的概念&#xff0c;把具有相同属性和方法的对象归为一类 编写顺序&#xff1a;先有…

神经网络系列---池化

文章目录 池化最大池化平均池化 池化 最大池化 最大池化&#xff08;Max Pooling&#xff09;是卷积神经网络中常用的一种池化技术。其操作是&#xff1a;在输入特征图的一个局部窗口内选取最大的值作为该窗口的输出。 数学表达式如下&#xff1a; 考虑一个输入特征图 A A…

[C++][linux]Linux上内存共享内存用法

一&#xff0c;什么是共享内存 共享内存&#xff08;Shared Memory&#xff09;&#xff0c;指两个或多个进程共享一个给定的存储区。进程可以将同一段共享内存连接到它们自己的地址空间中&#xff0c;所有进程都可以访问共享内存中的地址&#xff0c;就好像它们是由用C语言函…

【ELK05】es的java-api操作-Java High Level REST Client常用功能

1.客户端概括 1.1支持多种客户端 ES支持多种语言客户都安,包括ruby js python java go .net等,其中java目前最新版本的客户都安支持2种方式。一种是旧版已经过时的transport client ,一种是java high level rest client,前者是通过tcp协议链接访问es,后者就是java代码实…

系统学习Python——装饰器:类装饰器-[装饰器与管理器函数]

分类目录&#xff1a;《系统学习Python》总目录 抛开这些细节微妙性&#xff0c;Tracer类装饰器示例最终仍然是依赖于__getattr__来拦截对包装的和内嵌实例对象的获取。正如我们在前面见到的&#xff0c;我们真正需要完成的只是把实例创建调用移入一个类的内部&#xff0c;而不…

GEE入门篇|遥感专业术语(实践操作4):光谱分辨率(Spectral Resolution)

目录 光谱分辨率&#xff08;Spectral Resolution&#xff09; 1.MODIS 2.EO-1 光谱分辨率&#xff08;Spectral Resolution&#xff09; 光谱分辨率是指传感器进行测量的光谱带的数量和宽度。 您可以将光谱带的宽度视为每个波段的波长间隔&#xff0c;在多个波段测量辐射亮…

RestTemplate启动问题解决

⭐ 作者简介&#xff1a;码上言 ⭐ 代表教程&#xff1a;Spring Boot vue-element 开发个人博客项目实战教程 ⭐专栏内容&#xff1a;个人博客系统 ⭐我的文档网站&#xff1a;http://xyhwh-nav.cn/ RestTemplate启动问题解决 问题&#xff1a;在SpringCloud架构项目中配…