在实现ia业务服务器时需要构建一个python-socket客户端,1、要求能与服务器保持心跳连接,每10秒钟发送一次心跳信号;2、要求能根据socket服务器发送的指令创建或终止一个定时任务。
为此以3个类实现该功能,分别为socket通信类(用于实现通信连接与任务创建)、任务池类(用于管理任务)、任务类(用于实现具体任务)。
1、socket通信客户端
这里定义的MySocket类主体结构如下图所示,共包含4个函数,2个线程(其本身继承Thread类实现主任务流程——run函数、接收服务器信息并创建任务添加到任务池 或者接收服务器返回的心跳数据
;同时又在__init__函数中将self.thread_msg类封装为一个线程,每隔10秒钟向socket服务器发送一次心跳包
)。check_connection函数用于检测socket是否与服务器断开连接,在send_msg函数中调用,当发现客户端掉线后则立刻进行重连。send_msg函数用于发送信息给服务器,因为run函数与thread_msg函数2个线程都需要调用连接与服务器发送数据,为避免冲突故而定义为函数在内部进行加锁。这里的关键点在于,有多个线程可以发送数据(thread_msg与run线程),但是只有一个线程可以接收数据(run函数),单一线程接收数据可以避免服务器发送的数据存在冲突(两个线程同时接收数据就会存在死锁)
#socket客户端
class MySocket(Thread):def __init__(self,config):super().__init__()# 1.创建套接字self.tcp_socket = socket(AF_INET,SOCK_STREAM)self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护# 2.准备连接服务器,建立连接self.serve_ip = config["serve_ip"]#当前"118.24.111,149"self.serve_port = config["serve_port"] #端口当前7900self.sleep_time = config["sleep_time"]print("connect to : ",self.serve_ip,self.serve_port)self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式self.lock = threading.RLock()self.taskpool=TaskPool()task_msg=threading.Thread(target=self.thread_msg)task_msg.daemon = Truetask_msg.start()#定时发送信息def run(self):while True:a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1ka=a.decode('utf-8')if len(a)<66:#此时的数据为服务器返回的心跳数据continueprint("------主线程-----",a)jdata=json.loads(a)#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}task=OCRTask(jdata)self.taskpool.append(task)json_data={ "type":"OCR_STATE_ACK","timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"streamAddr": jdata["streamAddr"]}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)self.send_msg(data)def check_connection(self):try:self.tcp_socket.getpeername()return Trueexcept socket.error:return False#定时发送心跳信息def thread_msg(self):while True:#message=input('You can say:')#json标注的模板json_data={ "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"type":"HEARBEAT"}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)#进行定时发送self.send_msg(data)def send_msg(self,msg):if self.check_connection() is False:print('服务器掉线!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式try:#进行定时发送self.lock.acquire()self.tcp_socket.send(msg)self.lock.release()except ConnectionRefusedError:print('服务器拒绝本次连接!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式except TimeoutError:print('连接超时!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式except OSError:self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式print('智能终端无网络连接!!!!!')
2、任务池实现
任务池的实现代码如下所示,主要包含3个函数(其中将remove_task封装为一个子线程,用于实时移除已经完成计算任务的线程),append函数用于将新创建的任务添加大任务池pool中,stop函数用于停止并移除正在运行中的任务。
其具体实现代码如下所示,其作为MySocket类中的一个成员属性,每当MySocket接收到服务器信息创建任务ocrtask后都调用TaskPool.append(ocrtask)将任务添加到任务池中。由任务池管理任务的声明周期,具体可见其append函数可以启动task或终止task。remove_task线程会自动将已经完成的任务移除。
#ocr任务线程池
class TaskPool:def __init__(self,sleep_time=0.5):self.pool=[]self.sleep_time=sleep_timetask_msg=threading.Thread(target=self.remove_task)task_msg.daemon = Truetask_msg.start()#删除已经结束的任务def remove_task(self):while True:names=[]for task in self.pool:if task.get_count()==0: #生存时间为0,认为该任务已经完成需要被删除task.stop()self.pool.remove(task)else:names.append(task.taskname)if len(names)>0:print(names)time.sleep(self.sleep_time)def append(self,ocrtask):if ocrtask.state==0:#终止任务self.stop(ocrtask)else:#启动任务ocrtask.start()self.pool.append(ocrtask)#终止任务def stop(self,ocrtask):for task in self.pool:if task.taskname==ocrtask.taskname:task.stop()self.pool.remove(task)
3、具体任务线程
任务的实现代码如下所示,其支持3中任务模式,使用state区分任务,state为0-停止识别,1-连续识别count张,2-持续识别(故而在state为2时将count设置的特别大)。这里以count控制任务的运行,任务每运行一次count减少1。当count小于等于0,则表示任务运行完成。在TaskPool的remove_task中检测到count为0时则会自动删除任务。
#ocr任务
class OCRTask(Thread):def __init__(self,json):super().__init__()self.streamAddr=json["streamAddr"]self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别if json["state"]==2:self.count=9999999999999999999999999else:self.count=json["count"]if "taskname" in json.keys():self.taskname=json["taskname"]else:self.taskname=json["streamAddr"]self.jsonname=json["jsonname"]self.lock = threading.RLock()def run(self):while self.get_count()>0:print('run %s'%self.taskname,end='*')time.sleep(2)self.lock.acquire()self.count-=1self.lock.release()print('%s finish!! '%self.taskname)#获取任务的生存时间def get_count(self):self.lock.acquire()now_count=self.countself.lock.release()#削减countreturn now_count#停止任务def stop(self):self.lock.acquire()self.count=-1self.lock.release()#停止任务pass
4、完整代码与使用效果
完整代码如下所示
from socket import *
import time,json
import yaml
import threading,struct
from threading import Threaddef hex_to_bytes(hex_str):""":param hex_str: 16进制字符串:return: byte_data 字节流数据"""bytes_data = bytes()while hex_str :"""16进制字符串转换为字节流"""temp = hex_str[0:2]s = int(temp, 16)bytes_data += struct.pack('B', s)hex_str = hex_str[2:]return bytes_data# 读取Yaml文件方法
def read_yaml(yaml_path):with open(yaml_path, encoding="utf-8", mode="r") as f:result = yaml.load(stream=f,Loader=yaml.FullLoader)return result#ocr任务
class OCRTask(Thread):def __init__(self,json):super().__init__()self.streamAddr=json["streamAddr"]self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别if json["state"]==2:self.count=9999999999999999999999999else:self.count=json["count"]if "taskname" in json.keys():self.taskname=json["taskname"]else:self.taskname=json["streamAddr"]self.jsonname=json["jsonname"]self.lock = threading.RLock()def run(self):while self.get_count()>0:print('run %s'%self.taskname,end='*')time.sleep(2)self.lock.acquire()self.count-=1self.lock.release()print('%s finish!! '%self.taskname)#获取任务的生存时间def get_count(self):self.lock.acquire()now_count=self.countself.lock.release()#削减countreturn now_count#停止任务def stop(self):self.lock.acquire()self.count=-1self.lock.release()#停止任务pass#ocr任务线程池
class TaskPool:def __init__(self,sleep_time=0.5):self.pool=[]self.sleep_time=sleep_timetask_msg=threading.Thread(target=self.remove_task)task_msg.daemon = Truetask_msg.start()#删除已经结束的任务def remove_task(self):while True:names=[]for task in self.pool:if task.get_count()==0:task.stop()self.pool.remove(task)else:names.append(task.taskname)if len(names)>0:print(names)time.sleep(self.sleep_time)def append(self,ocrtask):if ocrtask.state==0:#终止任务self.stop(ocrtask)else:#启动任务ocrtask.start()self.pool.append(ocrtask)#终止任务def stop(self,ocrtask):for task in self.pool:if task.taskname==ocrtask.taskname:task.stop()self.pool.remove(task)#socket客户端
class MySocket(Thread):def __init__(self,config):super().__init__()# 1.创建套接字self.tcp_socket = socket(AF_INET,SOCK_STREAM)self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护# 2.准备连接服务器,建立连接self.serve_ip = config["serve_ip"]#当前"118.24.111,149"self.serve_port = config["serve_port"] #端口当前7900self.sleep_time = config["sleep_time"]print("connect to : ",self.serve_ip,self.serve_port)self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式self.lock = threading.RLock()self.taskpool=TaskPool()task_msg=threading.Thread(target=self.thread_msg)task_msg.daemon = Truetask_msg.start()#定时发送信息#通信线程-用于接收服务器的指令def run(self):while True:a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1ka=a.decode('utf-8')if len(a)<66:#服务器返回的心跳包,不予处理continueprint("------主线程-----",a)jdata=json.loads(a)#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}task=OCRTask(jdata)self.taskpool.append(task)json_data={ "type":"OCR_STATE_ACK","timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"streamAddr": jdata["streamAddr"]}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)self.send_msg(data)#检测socket连接是否断开def check_connection(self):try:self.tcp_socket.getpeername()return Trueexcept socket.error:return False#定时发送心跳信息--子线程def thread_msg(self):while True:#message=input('You can say:')#json标注的模板json_data={ "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"type":"HEARBEAT"}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)#进行定时发送self.send_msg(data)#发送信息def send_msg(self,msg):if self.check_connection() is False:print('服务器掉线!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式try:#进行定时发送self.lock.acquire()self.tcp_socket.send(msg)self.lock.release()except ConnectionRefusedError:print('服务器拒绝本次连接!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式except TimeoutError:print('连接超时!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式except OSError:self.tcp_socket.connect((self.serve_ip,self.serve_port)) # 连接服务器,建立连接,参数是元组形式print('智能终端无网络连接!!!!!')if "__main__"==__name__:#进行定时通信测试config=read_yaml("config.yaml")socket_client=MySocket(config)socket_client.start()
使用效果如下所示,这里基于socket调试工具作为客户端